mukund-thakur commented on code in PR #7151:
URL: https://github.com/apache/hadoop/pull/7151#discussion_r1841153728


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -5593,6 +5594,10 @@ public boolean hasPathCapability(final Path path, final 
String capability)
     case AWS_S3_ACCESS_GRANTS_ENABLED:
       return s3AccessGrantsEnabled;
 
+      // stream leak detection.
+    case StreamStatisticNames.STREAM_LEAKS:

Review Comment:
   disabled because we will have multiple prefetch streams reading data ? 



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+public final class TestLeakReporter extends AbstractHadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLeakReporter.class);
+
+  /**
+   * Count of close calls.
+   */
+  private final AtomicInteger closeCount = new AtomicInteger();
+
+  /**
+   * Big test: creates a reporter, closes it.
+   * Verifies that the error message and stack traces is printed when
+   * open, and that the close callback was invoked.
+   * <p>
+   * After the first invocation, a second invocation is ignored.
+   */
+  @Test
+  public void testLeakInvocation() throws Throwable {
+
+    final String message = "<message>";
+    final LeakReporter reporter = new LeakReporter(message,
+        () -> true,
+        this::closed);
+
+    // store the old thread name and change it,
+    // so the log test can verify that the old thread name is printed.
+    String oldName = Thread.currentThread().getName();
+    Thread.currentThread().setName("thread");
+    // Capture the logs
+    GenericTestUtils.LogCapturer logs =
+        captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+    expectClose(reporter, 1);
+
+    // check the log
+    logs.stopCapturing();
+    final String output = logs.getOutput();
+    LOG.info("output of leak log is {}", output);
+
+    final String threadInfo = String.format(THREAD_FORMAT,
+        oldName,
+        Thread.currentThread().getId());
+    // log auditing
+    Assertions.assertThat(output)
+        .describedAs("output from the logs")
+        .contains("WARN")
+        .contains(message)
+        .contains(Thread.currentThread().getName())
+        .contains(threadInfo)
+        .contains("TestLeakReporter.testLeakInvocation")
+        .contains("INFO")
+        .contains("stack");
+
+    // no reentrancy
+    expectClose(reporter, 1);
+  }
+
+  /**
+   * Expect the close operation to result in
+   * a value of the close count to be as expected.
+   * @param reporter leak reporter
+   * @param expected expected value after the close
+   */
+  private void expectClose(final LeakReporter reporter, final int expected) {
+    reporter.close();
+    assertCloseCount(expected);
+  }
+
+  /**
+   * Close operation: increments the counter.
+   */
+  private void closed() {
+    closeCount.incrementAndGet();
+  }
+
+  /**
+   * When the source is closed, no leak cleanup takes place.
+   */
+  @Test
+  public void testLeakSkipped() throws Throwable {
+
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> false,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * If the probe raises an exception, the exception is swallowed
+   * and the close action is never invoked.
+   */
+  @Test
+  public void testProbeFailureSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        this::raiseNPE,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * Any exception raised in the close action it is swallowed.
+   */
+  @Test
+  public void testCloseActionSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> true,
+        this::raiseNPE);
+    reporter.close();
+
+    Assertions.assertThat(reporter.isClosed())
+        .describedAs("reporter closed)")
+        .isTrue();
+  }
+
+  /**
+   * Always raises an NPE.
+   * @return never
+   */
+  private boolean raiseNPE() {
+    throw new NullPointerException("oops");
+  }
+
+  /**
+   * Assert that the value of {@link #closeCount} is as expected.
+   * @param ex expected.
+   */
+  private void assertCloseCount(final int ex) {
+    Assertions.assertThat(closeCount.get())
+        .describedAs("close count")
+        .isEqualTo(ex);
+  }
+}

Review Comment:
   nit: checkstyle blank line 



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md:
##########
@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing 
else will*.
 Based on the experience of people who field support calls, here are
 some of the main connectivity issues which cause problems.
 
+### <a name="Not-enough-connections"></a> Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower 
operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache 
Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+   the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+   `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`

Review Comment:
   NM, got it. since it is getting initialized in S3AInputStream. 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.lang.ref.WeakReference;
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+/**
+ * Test Stream leakage.
+ */
+public class ITestS3AInputStreamLeakage extends AbstractS3ATestBase {
+
+  /**
+   * How big a file to create?
+   */
+  public static final int FILE_SIZE = 1024;
+
+  public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
+
+  /**
+   * Time to wait after a GC/finalize is triggered before looking at the log.
+   */
+  public static final long GC_DELAY = Duration.ofSeconds(1).toMillis();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assume("Stream leak detection not avaialable",
+        getFileSystem().hasCapability(STREAM_LEAKS));
+  }
+
+  /**
+   * This test forces a GC of an open file then verifies that the
+   * log contains the error message.
+   * <p>
+   * Care is needed here to ensure that no strong references are held to the
+   * stream, otherwise: no GC.
+   * <p>
+   * It also assumes that {@code System.gc()} will do enough of a treewalk to
+   * prepare the stream for garbage collection (a weak ref is used to verify
+   * that it was removed as a reference), and that
+   * {@code System.runFinalization()} will then
+   * invoke the finalization.
+   * <p>
+   * The finalize code runs its own thread "Finalizer"; this is async enough
+   * that assertions on log entries only work if there is a pause after
+   * finalization is triggered and the log is reviewed.
+   * <p>
+   * The stream leak counter of the FileSystem is also updated; this
+   * is verified.
+   * <p>
+   * Note: if the stream under test is not an S3AInputStream (i.e. is a 
prefetching one,
+   * this test is skipped. If/when the prefetching stream adds the same code,
+   * this check can be removed.
+   */
+  @Test
+  public void testFinalizer() throws Throwable {
+    Path path = methodPath();
+    final S3AFileSystem fs = getFileSystem();
+
+    ContractTestUtils.createFile(fs, path, true, DATASET);
+
+    // DO NOT use try-with-resources; this
+    // test MUST be able to remove all references
+    // to the stream
+    FSDataInputStream in = fs.open(path);
+
+    try {
+      Assertions.assertThat(in.hasCapability(STREAM_LEAKS))
+          .describedAs("Stream leak detection not supported in: " + 
in.getClass())
+          .isTrue();
+
+      Assertions.assertThat(in.read())
+          .describedAs("first byte read from %s", in)
+          .isEqualTo(DATASET[0]);
+
+      // get a weak ref so that after a GC we can look for it and verify it is 
gone
+      Assertions.assertThat(((S3AInputStream) 
in.getWrappedStream()).isObjectStreamOpen())
+          .describedAs("stream http connection status")
+          .isTrue();
+      // weak reference to track GC progress
+      WeakReference<S3AInputStream> wrs =
+          new WeakReference<>((S3AInputStream) in.getWrappedStream());
+
+      // Capture the logs
+      GenericTestUtils.LogCapturer logs =
+          captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+
+      LOG.info("captured log");
+
+      // remove strong reference to the stream
+      in = null;
+      // force the gc.
+      System.gc();

Review Comment:
   this doesn't necessarily always trigger GC but is best effort. 
   I remember working on similar tests but now you have found runFinalization. 
Never used this. 
   but isn't gc supposed to run finalize anyway?  
   Reading the java doc I think gc will do that but this will ensure it is 
done. 



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md:
##########
@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing 
else will*.
 Based on the experience of people who field support calls, here are
 some of the main connectivity issues which cause problems.
 
+### <a name="Not-enough-connections"></a> Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower 
operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache 
Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+   the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+   `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`

Review Comment:
   wondering where and how the stack trace of open is getting printed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to