[ 
https://issues.apache.org/jira/browse/HADOOP-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585303#comment-17585303
 ] 

ASF GitHub Bot commented on HADOOP-18410:
-----------------------------------------

steveloughran commented on code in PR #4766:
URL: https://github.com/apache/hadoop/pull/4766#discussion_r955917752


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Drains/aborts s3 or other AWS SDK streams.
+ * It is callable so can be passed directly to a submitter
+ * for async invocation.
+ * A request object may be passed in; it will be implicitly
+ * cached until this object is GCd.
+ * This is because in some versions of the AWS SDK, the S3Object
+ * has a finalize() method which releases the http connection,
+ * even when the stream is still open.
+ * See HADOOP-17338 for details.
+ */
+public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SDKStreamDrainer.class);
+
+  /**
+   * URI for log messages.
+   */
+  private final String uri;
+
+  /**
+   * Request object; usually S3Object
+   * Never used, but needed to keep the http connection
+   * open long enough for draining to take place.
+   */
+  @Nullable
+  private final Closeable requestObject;
+
+  /**
+   * Stream from the {@link #requestObject} for draining and closing.
+   */
+  private final SdkFilterInputStream inner;
+
+  /**
+   * Should the request be aborted?
+   */
+  private final boolean shouldAbort;
+
+  /**
+   * How many bytes remaining?
+   * This is decremented as the stream is
+   * drained;
+   * If the stream finished before the expected
+   * remaining value was read, this will show how many
+   * bytes were still expected.
+   */
+  private int remaining;
+
+  /**
+   * Statistics to update with the duration.
+   */
+  private final S3AInputStreamStatistics streamStatistics;
+
+  /**
+   * Reason? for log messages.
+   */
+  private final String reason;
+
+  /**
+   * Has the operation executed yet?
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * Any exception caught during execution.
+   */
+  private Exception thrown;
+
+  /**
+   * Was the stream aborted?
+   */
+  private boolean aborted;
+
+  /**
+   * how many bytes were drained?
+   */
+  private int drained = 0;
+
+  /**
+   * Prepare to drain the stream.
+   * @param uri URI for messages
+   * @param requestObject http request object; needed to avoid GC issues.
+   * @param inner stream to close.
+   * @param shouldAbort force an abort; used if explicitly requested.
+   * @param streamStatistics stats to update
+   * @param reason reason for stream being closed; used in messages
+   * @param remaining remaining bytes
+   */
+  public SDKStreamDrainer(final String uri,
+      @Nullable final Closeable requestObject,
+      final SdkFilterInputStream inner,
+      final boolean shouldAbort,
+      final int remaining,
+      final S3AInputStreamStatistics streamStatistics,
+      final String reason) {
+    this.uri = uri;
+    this.requestObject = requestObject;
+    this.inner = requireNonNull(inner);
+    this.shouldAbort = shouldAbort;
+    this.remaining = remaining;
+    this.streamStatistics = requireNonNull(streamStatistics);
+    this.reason = reason;
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   * @return was the stream aborted?
+   */
+  @Override
+  public Boolean apply() {
+    try {
+      Boolean outcome = invokeTrackingDuration(
+          streamStatistics.initiateInnerStreamClose(shouldAbort),
+          this::drainOrAbortHttpStream);
+      aborted = outcome;
+      return outcome;
+    } catch (Exception e) {
+      thrown = e;
+      return aborted;
+    }
+  }
+
+  /**
+   * Apply, raising any exception.
+   * For testing.
+   * @return the outcome.
+   * @throws Exception anything raised.
+   */
+  @VisibleForTesting
+  boolean applyRaisingException() throws Exception {
+    Boolean outcome = apply();
+    if (thrown != null) {
+      throw thrown;
+    }
+    return outcome;
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are saved then swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   */
+  private boolean drainOrAbortHttpStream() {
+    if (executed.getAndSet(true)) {
+      throw new IllegalStateException(
+          "duplicate invocation of drain operation");
+    }
+    boolean executeAbort = shouldAbort;
+    LOG.debug("drain or abort reason {} remaining={} abort={}",
+        reason, remaining, executeAbort);
+
+    if (!executeAbort) {
+      try {
+        // clean close. This will read to the end of the stream,
+        // so, while cleaner, can be pathological on a multi-GB object
+
+        if (remaining > 0) {
+          // explicitly drain the stream
+          LOG.debug("draining {} bytes", remaining);
+          drained = 0;
+          int size = Math.min(remaining, DRAIN_BUFFER_SIZE);
+          byte[] buffer = new byte[size];
+          // read the data; bail out early if
+          // the connection breaks.
+          // this may be a bit overaggressive on buffer underflow.
+          while (remaining > 0) {
+            final int count = inner.read(buffer);
+            LOG.debug("read {} bytes", count);
+            if (count <= 0) {
+              // no more data is left
+              break;
+            }
+            drained += count;
+            remaining -= count;
+          }
+          LOG.debug("Drained stream of {} bytes", drained);
+        }
+
+        if (remaining != 0) {
+          // fewer bytes than expected came back; not treating as a
+          // reason to escalate to an abort().
+          // just log.
+          LOG.debug("drained fewer bytes than expected; {} remaining",
+              remaining);
+        }
+
+        // now close it.
+        // if there is still data in the stream, the SDK
+        // will warn and escalate to an abort itself.
+        LOG.debug("Closing stream");
+        inner.close();
+
+        cleanupWithLogger(LOG, requestObject);
+        // this MUST come after the close, so that if the IO operations fail
+        // and an abort is triggered, the initial attempt's statistics
+        // aren't collected.
+        streamStatistics.streamClose(false, drained);
+        return false;
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream",

Review Comment:
   don't want logs full of warnings as they create issues of their own. seen it 
too many times. 





> S3AInputStream.unbuffer() async drain not releasing http connections
> --------------------------------------------------------------------
>
>                 Key: HADOOP-18410
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18410
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.3.9
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Blocker
>              Labels: pull-request-available
>
> Impala tcp-ds setup to s3 is hitting problems with timeout fetching http 
> connections from the s3a fs pool. Disabling s3a async drain makes this 
> problem *go away*. assumption, either those async ops are blocking, or they 
> are not releasing references properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to