[ 
https://issues.apache.org/jira/browse/HADOOP-19330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898057#comment-17898057
 ] 

ASF GitHub Bot commented on HADOOP-19330:
-----------------------------------------

mukund-thakur commented on code in PR #7151:
URL: https://github.com/apache/hadoop/pull/7151#discussion_r1841153728


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -5593,6 +5594,10 @@ public boolean hasPathCapability(final Path path, final 
String capability)
     case AWS_S3_ACCESS_GRANTS_ENABLED:
       return s3AccessGrantsEnabled;
 
+      // stream leak detection.
+    case StreamStatisticNames.STREAM_LEAKS:

Review Comment:
   disabled because we will have multiple prefetch streams reading data ? 



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+public final class TestLeakReporter extends AbstractHadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLeakReporter.class);
+
+  /**
+   * Count of close calls.
+   */
+  private final AtomicInteger closeCount = new AtomicInteger();
+
+  /**
+   * Big test: creates a reporter, closes it.
+   * Verifies that the error message and stack traces is printed when
+   * open, and that the close callback was invoked.
+   * <p>
+   * After the first invocation, a second invocation is ignored.
+   */
+  @Test
+  public void testLeakInvocation() throws Throwable {
+
+    final String message = "<message>";
+    final LeakReporter reporter = new LeakReporter(message,
+        () -> true,
+        this::closed);
+
+    // store the old thread name and change it,
+    // so the log test can verify that the old thread name is printed.
+    String oldName = Thread.currentThread().getName();
+    Thread.currentThread().setName("thread");
+    // Capture the logs
+    GenericTestUtils.LogCapturer logs =
+        captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+    expectClose(reporter, 1);
+
+    // check the log
+    logs.stopCapturing();
+    final String output = logs.getOutput();
+    LOG.info("output of leak log is {}", output);
+
+    final String threadInfo = String.format(THREAD_FORMAT,
+        oldName,
+        Thread.currentThread().getId());
+    // log auditing
+    Assertions.assertThat(output)
+        .describedAs("output from the logs")
+        .contains("WARN")
+        .contains(message)
+        .contains(Thread.currentThread().getName())
+        .contains(threadInfo)
+        .contains("TestLeakReporter.testLeakInvocation")
+        .contains("INFO")
+        .contains("stack");
+
+    // no reentrancy
+    expectClose(reporter, 1);
+  }
+
+  /**
+   * Expect the close operation to result in
+   * a value of the close count to be as expected.
+   * @param reporter leak reporter
+   * @param expected expected value after the close
+   */
+  private void expectClose(final LeakReporter reporter, final int expected) {
+    reporter.close();
+    assertCloseCount(expected);
+  }
+
+  /**
+   * Close operation: increments the counter.
+   */
+  private void closed() {
+    closeCount.incrementAndGet();
+  }
+
+  /**
+   * When the source is closed, no leak cleanup takes place.
+   */
+  @Test
+  public void testLeakSkipped() throws Throwable {
+
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> false,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * If the probe raises an exception, the exception is swallowed
+   * and the close action is never invoked.
+   */
+  @Test
+  public void testProbeFailureSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        this::raiseNPE,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * Any exception raised in the close action it is swallowed.
+   */
+  @Test
+  public void testCloseActionSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> true,
+        this::raiseNPE);
+    reporter.close();
+
+    Assertions.assertThat(reporter.isClosed())
+        .describedAs("reporter closed)")
+        .isTrue();
+  }
+
+  /**
+   * Always raises an NPE.
+   * @return never
+   */
+  private boolean raiseNPE() {
+    throw new NullPointerException("oops");
+  }
+
+  /**
+   * Assert that the value of {@link #closeCount} is as expected.
+   * @param ex expected.
+   */
+  private void assertCloseCount(final int ex) {
+    Assertions.assertThat(closeCount.get())
+        .describedAs("close count")
+        .isEqualTo(ex);
+  }
+}

Review Comment:
   nit: checkstyle blank line 



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md:
##########
@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing 
else will*.
 Based on the experience of people who field support calls, here are
 some of the main connectivity issues which cause problems.
 
+### <a name="Not-enough-connections"></a> Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower 
operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache 
Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+   the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+   `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`

Review Comment:
   NM, got it. since it is getting initialized in S3AInputStream. 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.lang.ref.WeakReference;
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+/**
+ * Test Stream leakage.
+ */
+public class ITestS3AInputStreamLeakage extends AbstractS3ATestBase {
+
+  /**
+   * How big a file to create?
+   */
+  public static final int FILE_SIZE = 1024;
+
+  public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
+
+  /**
+   * Time to wait after a GC/finalize is triggered before looking at the log.
+   */
+  public static final long GC_DELAY = Duration.ofSeconds(1).toMillis();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assume("Stream leak detection not avaialable",
+        getFileSystem().hasCapability(STREAM_LEAKS));
+  }
+
+  /**
+   * This test forces a GC of an open file then verifies that the
+   * log contains the error message.
+   * <p>
+   * Care is needed here to ensure that no strong references are held to the
+   * stream, otherwise: no GC.
+   * <p>
+   * It also assumes that {@code System.gc()} will do enough of a treewalk to
+   * prepare the stream for garbage collection (a weak ref is used to verify
+   * that it was removed as a reference), and that
+   * {@code System.runFinalization()} will then
+   * invoke the finalization.
+   * <p>
+   * The finalize code runs its own thread "Finalizer"; this is async enough
+   * that assertions on log entries only work if there is a pause after
+   * finalization is triggered and the log is reviewed.
+   * <p>
+   * The stream leak counter of the FileSystem is also updated; this
+   * is verified.
+   * <p>
+   * Note: if the stream under test is not an S3AInputStream (i.e. is a 
prefetching one,
+   * this test is skipped. If/when the prefetching stream adds the same code,
+   * this check can be removed.
+   */
+  @Test
+  public void testFinalizer() throws Throwable {
+    Path path = methodPath();
+    final S3AFileSystem fs = getFileSystem();
+
+    ContractTestUtils.createFile(fs, path, true, DATASET);
+
+    // DO NOT use try-with-resources; this
+    // test MUST be able to remove all references
+    // to the stream
+    FSDataInputStream in = fs.open(path);
+
+    try {
+      Assertions.assertThat(in.hasCapability(STREAM_LEAKS))
+          .describedAs("Stream leak detection not supported in: " + 
in.getClass())
+          .isTrue();
+
+      Assertions.assertThat(in.read())
+          .describedAs("first byte read from %s", in)
+          .isEqualTo(DATASET[0]);
+
+      // get a weak ref so that after a GC we can look for it and verify it is 
gone
+      Assertions.assertThat(((S3AInputStream) 
in.getWrappedStream()).isObjectStreamOpen())
+          .describedAs("stream http connection status")
+          .isTrue();
+      // weak reference to track GC progress
+      WeakReference<S3AInputStream> wrs =
+          new WeakReference<>((S3AInputStream) in.getWrappedStream());
+
+      // Capture the logs
+      GenericTestUtils.LogCapturer logs =
+          captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+
+      LOG.info("captured log");
+
+      // remove strong reference to the stream
+      in = null;
+      // force the gc.
+      System.gc();

Review Comment:
   this doesn't necessarily always trigger GC but is best effort. 
   I remember working on similar tests but now you have found runFinalization. 
Never used this. 
   but isn't gc supposed to run finalize anyway?  
   Reading the java doc I think gc will do that but this will ensure it is 
done. 



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md:
##########
@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing 
else will*.
 Based on the experience of people who field support calls, here are
 some of the main connectivity issues which cause problems.
 
+### <a name="Not-enough-connections"></a> Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower 
operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache 
Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+   the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+   `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`

Review Comment:
   wondering where and how the stack trace of open is getting printed? 





> S3AInputStream.finalizer to warn if closed with http connection -then release 
> it
> --------------------------------------------------------------------------------
>
>                 Key: HADOOP-19330
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19330
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.1
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> A recurring problem is that applications forget to close their input streams; 
> eventually the HTTP connection runs out.
> Having the finalizer close streams during GC will ensure that after a GC the 
> http connections are returned. While this is an improvement on today, it is 
> insufficient
> * only happens during GC, so may not fix problem entirely
> * doesn't let developers know things are going wrong.
> * doesn't let us differentiate well between stream leak and overloaded FS
> proposed enhancements then
> * collect stack trace in constructor
> * log in finalize at warn including path, thread and stack
> * have special log for this, so it can be turned off in production (libraries 
> telling end users off for developer errors is simply an annoyance)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to