[ 
https://issues.apache.org/jira/browse/HADOOP-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585274#comment-17585274
 ] 

ASF GitHub Bot commented on HADOOP-18410:
-----------------------------------------

mehakmeet commented on code in PR #4766:
URL: https://github.com/apache/hadoop/pull/4766#discussion_r955790641


##########
hadoop-tools/hadoop-aws/src/test/resources/core-site.xml:
##########
@@ -184,12 +184,17 @@
     <value>true</value>
   </property>
 
+

Review Comment:
   nit: extra blank line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for stream draining.
+ */
+public class TestSDKStreamDrainer extends HadoopTestBase {
+
+  public static final int BYTES = 100;
+
+  /**
+   * Aborting does as asked.
+   */
+  @Test
+  public void testDrainerAborted() throws Throwable {
+    assertAborted(drainer(BYTES, true, stream()));
+  }
+
+  /**
+   * Create a stream of the default length.
+   * @return a stream.
+   */
+  private static FakeSDKInputStream stream() {
+    return new FakeSDKInputStream(BYTES);
+  }
+
+  /**
+   * a normal drain; all bytes are read. No abort.
+   */
+  @Test
+  public void testDrainerDrained() throws Throwable {
+    assertBytesReadNotAborted(
+        drainer(BYTES, false, stream()),
+        BYTES);
+  }
+
+  /**
+   * Empty streams are fine.
+   */
+  @Test
+  public void testEmptyStream() throws Throwable {
+    int size = 0;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Single char read; just a safety check on the test stream more than
+   * the production code.
+   */
+  @Test
+  public void testSingleChar() throws Throwable {
+    int size = 1;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * a read spanning multiple buffers.
+   */
+  @Test
+  public void testMultipleBuffers() throws Throwable {
+    int size = DRAIN_BUFFER_SIZE + 1;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Read of exactly one buffer.
+   */
+  @Test
+  public void testExactlyOneBuffer() throws Throwable {
+    int size = DRAIN_BUFFER_SIZE;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Less data than expected came back. not escalated.
+   */
+  @Test
+  public void testStreamUnderflow() throws Throwable {
+    int size = 50;
+    assertBytesReadNotAborted(
+        drainer(BYTES, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Test a drain where a read triggers an IOE; this must escalate
+   * to an abort.
+   */
+  @Test
+  public void testReadFailure() throws Throwable {
+    int threshold = 50;
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        new FakeSDKInputStream(BYTES, threshold),
+        false,
+        BYTES,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    intercept(IOException.class, "", () ->
+        drainer.applyRaisingException());
+
+    assertAborted(drainer);
+  }
+
+  /**
+   * abort does not read(), so the exception will not surface.
+   */
+  @Test
+  public void testReadFailureDoesNotSurfaceInAbort() throws Throwable {
+    int threshold = 50;
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        new FakeSDKInputStream(BYTES, threshold),
+        true,
+        BYTES,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    drainer.applyRaisingException();
+
+    assertAborted(drainer);
+  }
+
+  /**
+   * make sure the underlying stream read code works.
+   */
+  @Test
+  public void testFakeStreamRead() throws Throwable {
+    FakeSDKInputStream stream = stream();
+    int count = 0;
+    while (stream.read() > 0) {
+      count++;
+    }
+    Assertions.assertThat(count)
+        .describedAs("bytes read from %s", stream)
+        .isEqualTo(BYTES);
+  }
+
+  /**
+   * Create a drainer and invoke it, rethrowing any exception
+   * which occurred during the draining.
+   * @param remaining bytes remaining in the stream
+   * @param shouldAbort should we abort?
+   * @param in input stream.
+   * @return the drainer
+   * @throws Throwable something went wrong
+   */
+  private SDKStreamDrainer drainer(int remaining,
+      boolean shouldAbort,
+      FakeSDKInputStream in) throws Throwable {
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        in,
+        shouldAbort,
+        remaining,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    drainer.applyRaisingException();
+    return drainer;
+  }
+
+
+  /**
+   * The draining aborted.
+   * @param drainer drainer to assert on.
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) {
+    Assertions.assertThat(drainer)
+        .matches(SDKStreamDrainer::isAborted, "isAborted");
+    return drainer;
+  }
+
+  /**
+   * The draining was not aborted.
+   * @param drainer drainer to assert on.
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertNotAborted(SDKStreamDrainer drainer) {
+    Assertions.assertThat(drainer)
+        .matches(d -> !d.isAborted(), "is not aborted");
+    return drainer;
+  }
+
+  /**
+   * The draining was not aborted and {@code bytes} were read.
+   * @param drainer drainer to assert on.
+   * @param bytes expected byte count
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertBytesReadNotAborted(SDKStreamDrainer drainer,
+      int bytes) {
+    return assertBytesRead(assertNotAborted(drainer), bytes);
+  }
+
+  /**
+   * Assert {@code bytes} were read.
+   * @param drainer drainer to assert on.
+   * @param bytes expected byte count
+   * @return the drainer.
+   */
+  private static SDKStreamDrainer assertBytesRead(final SDKStreamDrainer 
drainer,
+      final int bytes) {
+    Assertions.assertThat(drainer)

Review Comment:
   Add an error message to this assertion in case of failure. "Bytes read are 
not as expected" maybe.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferAborting() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+
+    // open the file at the beginning with a whole file read policy,
+    // so even with s3a switching to random on unbuffer,
+    // this always does a full GET
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .must(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+        .build().get()) {
+
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  private FileStatus createTestFile() throws IOException {
+    byte[] data = dataset(FILE_SIZE, '0', 10);
+    S3AFileSystem fs = getFileSystem();
+
+    Path path = methodPath();
+    ContractTestUtils.createFile(fs, path, true, data);
+    FileStatus st = fs.getFileStatus(path);
+    return st;

Review Comment:
   nit: Just return `fs.getFileStatus(path);`, variable seems redundant?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferAborting() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+
+    // open the file at the beginning with a whole file read policy,
+    // so even with s3a switching to random on unbuffer,
+    // this always does a full GET
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .must(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+        .build().get()) {
+
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.read();
+        in.unbuffer();
+      }

Review Comment:
   We can assert the number of aborts collected in IOStats after the for loop 
`StreamStatisticNames.STREAM_READ_ABORTED` to be 10 in this test and 0 in the 
above test.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining

Review Comment:
   Same Javadocs as the above test, is that intentional? Maybe we should add 
aborting for this test?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Drains/aborts s3 or other AWS SDK streams.
+ * It is callable so can be passed directly to a submitter
+ * for async invocation.
+ * A request object may be passed in; it will be implicitly
+ * cached until this object is GCd.
+ * This is because in some versions of the AWS SDK, the S3Object
+ * has a finalize() method which releases the http connection,
+ * even when the stream is still open.
+ * See HADOOP-17338 for details.
+ */
+public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SDKStreamDrainer.class);
+
+  /**
+   * URI for log messages.
+   */
+  private final String uri;
+
+  /**
+   * Request object; usually S3Object
+   * Never used, but needed to keep the http connection
+   * open long enough for draining to take place.
+   */
+  @Nullable
+  private final Closeable requestObject;
+
+  /**
+   * Stream from the {@link #requestObject} for draining and closing.
+   */
+  private final SdkFilterInputStream inner;
+
+  /**
+   * Should the request be aborted?
+   */
+  private final boolean shouldAbort;
+
+  /**
+   * How many bytes remaining?
+   * This is decremented as the stream is
+   * drained;
+   * If the stream finished before the expected
+   * remaining value was read, this will show how many
+   * bytes were still expected.
+   */
+  private int remaining;
+
+  /**
+   * Statistics to update with the duration.
+   */
+  private final S3AInputStreamStatistics streamStatistics;
+
+  /**
+   * Reason? for log messages.
+   */
+  private final String reason;
+
+  /**
+   * Has the operation executed yet?
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * Any exception caught during execution.
+   */
+  private Exception thrown;
+
+  /**
+   * Was the stream aborted?
+   */
+  private boolean aborted;
+
+  /**
+   * how many bytes were drained?
+   */
+  private int drained = 0;
+
+  /**
+   * Prepare to drain the stream.
+   * @param uri URI for messages
+   * @param requestObject http request object; needed to avoid GC issues.
+   * @param inner stream to close.
+   * @param shouldAbort force an abort; used if explicitly requested.
+   * @param streamStatistics stats to update
+   * @param reason reason for stream being closed; used in messages
+   * @param remaining remaining bytes
+   */
+  public SDKStreamDrainer(final String uri,
+      @Nullable final Closeable requestObject,
+      final SdkFilterInputStream inner,
+      final boolean shouldAbort,
+      final int remaining,
+      final S3AInputStreamStatistics streamStatistics,
+      final String reason) {
+    this.uri = uri;
+    this.requestObject = requestObject;
+    this.inner = requireNonNull(inner);
+    this.shouldAbort = shouldAbort;
+    this.remaining = remaining;
+    this.streamStatistics = requireNonNull(streamStatistics);
+    this.reason = reason;
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   * @return was the stream aborted?
+   */
+  @Override
+  public Boolean apply() {
+    try {
+      Boolean outcome = invokeTrackingDuration(
+          streamStatistics.initiateInnerStreamClose(shouldAbort),
+          this::drainOrAbortHttpStream);
+      aborted = outcome;
+      return outcome;
+    } catch (Exception e) {
+      thrown = e;
+      return aborted;
+    }
+  }
+
+  /**
+   * Apply, raising any exception.
+   * For testing.
+   * @return the outcome.
+   * @throws Exception anything raised.
+   */
+  @VisibleForTesting
+  boolean applyRaisingException() throws Exception {
+    Boolean outcome = apply();
+    if (thrown != null) {
+      throw thrown;
+    }
+    return outcome;
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are saved then swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   */

Review Comment:
   Add a `@return` comment in the Javadocs to describe what is being returned.





> S3AInputStream.unbuffer() async drain not releasing http connections
> --------------------------------------------------------------------
>
>                 Key: HADOOP-18410
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18410
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.3.9
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Blocker
>              Labels: pull-request-available
>
> Impala tcp-ds setup to s3 is hitting problems with timeout fetching http 
> connections from the s3a fs pool. Disabling s3a async drain makes this 
> problem *go away*. assumption, either those async ops are blocking, or they 
> are not releasing references properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to