[
https://issues.apache.org/jira/browse/HADOOP-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585289#comment-17585289
]
ASF GitHub Bot commented on HADOOP-18410:
-----------------------------------------
steveloughran commented on code in PR #4766:
URL: https://github.com/apache/hadoop/pull/4766#discussion_r955898631
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Drains/aborts s3 or other AWS SDK streams.
+ * It is callable so can be passed directly to a submitter
+ * for async invocation.
+ * A request object may be passed in; it will be implicitly
+ * cached until this object is GCd.
+ * This is because in some versions of the AWS SDK, the S3Object
+ * has a finalize() method which releases the http connection,
+ * even when the stream is still open.
+ * See HADOOP-17338 for details.
+ */
+public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SDKStreamDrainer.class);
+
+ /**
+ * URI for log messages.
+ */
+ private final String uri;
+
+ /**
+ * Request object; usually S3Object
+ * Never used, but needed to keep the http connection
+ * open long enough for draining to take place.
+ */
+ @Nullable
+ private final Closeable requestObject;
+
+ /**
+ * Stream from the {@link #requestObject} for draining and closing.
+ */
+ private final SdkFilterInputStream inner;
+
+ /**
+ * Should the request be aborted?
+ */
+ private final boolean shouldAbort;
+
+ /**
+ * How many bytes remaining?
+ * This is decremented as the stream is
+ * drained;
+ * If the stream finished before the expected
+ * remaining value was read, this will show how many
+ * bytes were still expected.
+ */
+ private int remaining;
+
+ /**
+ * Statistics to update with the duration.
+ */
+ private final S3AInputStreamStatistics streamStatistics;
+
+ /**
+ * Reason? for log messages.
+ */
+ private final String reason;
+
+ /**
+ * Has the operation executed yet?
+ */
+ private final AtomicBoolean executed = new AtomicBoolean(false);
+
+ /**
+ * Any exception caught during execution.
+ */
+ private Exception thrown;
+
+ /**
+ * Was the stream aborted?
+ */
+ private boolean aborted;
+
+ /**
+ * how many bytes were drained?
+ */
+ private int drained = 0;
+
+ /**
+ * Prepare to drain the stream.
+ * @param uri URI for messages
+ * @param requestObject http request object; needed to avoid GC issues.
+ * @param inner stream to close.
+ * @param shouldAbort force an abort; used if explicitly requested.
+ * @param streamStatistics stats to update
+ * @param reason reason for stream being closed; used in messages
+ * @param remaining remaining bytes
+ */
+ public SDKStreamDrainer(final String uri,
+ @Nullable final Closeable requestObject,
+ final SdkFilterInputStream inner,
+ final boolean shouldAbort,
+ final int remaining,
+ final S3AInputStreamStatistics streamStatistics,
+ final String reason) {
+ this.uri = uri;
+ this.requestObject = requestObject;
+ this.inner = requireNonNull(inner);
+ this.shouldAbort = shouldAbort;
+ this.remaining = remaining;
+ this.streamStatistics = requireNonNull(streamStatistics);
+ this.reason = reason;
+ }
+
+ /**
+ * drain the stream. This method is intended to be
+ * used directly or asynchronously, and measures the
+ * duration of the operation in the stream statistics.
+ * @return was the stream aborted?
+ */
+ @Override
+ public Boolean apply() {
+ try {
+ Boolean outcome = invokeTrackingDuration(
+ streamStatistics.initiateInnerStreamClose(shouldAbort),
+ this::drainOrAbortHttpStream);
+ aborted = outcome;
+ return outcome;
+ } catch (Exception e) {
+ thrown = e;
+ return aborted;
+ }
+ }
+
+ /**
+ * Apply, raising any exception.
+ * For testing.
+ * @return the outcome.
+ * @throws Exception anything raised.
+ */
+ @VisibleForTesting
+ boolean applyRaisingException() throws Exception {
+ Boolean outcome = apply();
+ if (thrown != null) {
+ throw thrown;
+ }
+ return outcome;
+ }
+
+ /**
+ * Drain or abort the inner stream.
+ * Exceptions are saved then swallowed.
+ * If a close() is attempted and fails, the operation escalates to
+ * an abort.
+ */
+ private boolean drainOrAbortHttpStream() {
+ if (executed.getAndSet(true)) {
+ throw new IllegalStateException(
+ "duplicate invocation of drain operation");
+ }
+ boolean executeAbort = shouldAbort;
Review Comment:
shouldAbort is a final arg; executeAbort will be set to true if draining
doesn't work
> S3AInputStream.unbuffer() async drain not releasing http connections
> --------------------------------------------------------------------
>
> Key: HADOOP-18410
> URL: https://issues.apache.org/jira/browse/HADOOP-18410
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.3.9
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Blocker
> Labels: pull-request-available
>
> Impala tcp-ds setup to s3 is hitting problems with timeout fetching http
> connections from the s3a fs pool. Disabling s3a async drain makes this
> problem *go away*. assumption, either those async ops are blocking, or they
> are not releasing references properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]