steveloughran commented on code in PR #4766: URL: https://github.com/apache/hadoop/pull/4766#discussion_r955898631
########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java: ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nullable; + +import com.amazonaws.internal.SdkFilterInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.util.functional.CallableRaisingIOE; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Drains/aborts s3 or other AWS SDK streams. + * It is callable so can be passed directly to a submitter + * for async invocation. + * A request object may be passed in; it will be implicitly + * cached until this object is GCd. + * This is because in some versions of the AWS SDK, the S3Object + * has a finalize() method which releases the http connection, + * even when the stream is still open. + * See HADOOP-17338 for details. + */ +public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> { + + private static final Logger LOG = LoggerFactory.getLogger( + SDKStreamDrainer.class); + + /** + * URI for log messages. + */ + private final String uri; + + /** + * Request object; usually S3Object + * Never used, but needed to keep the http connection + * open long enough for draining to take place. + */ + @Nullable + private final Closeable requestObject; + + /** + * Stream from the {@link #requestObject} for draining and closing. + */ + private final SdkFilterInputStream inner; + + /** + * Should the request be aborted? + */ + private final boolean shouldAbort; + + /** + * How many bytes remaining? + * This is decremented as the stream is + * drained; + * If the stream finished before the expected + * remaining value was read, this will show how many + * bytes were still expected. + */ + private int remaining; + + /** + * Statistics to update with the duration. + */ + private final S3AInputStreamStatistics streamStatistics; + + /** + * Reason? for log messages. + */ + private final String reason; + + /** + * Has the operation executed yet? + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + /** + * Any exception caught during execution. + */ + private Exception thrown; + + /** + * Was the stream aborted? + */ + private boolean aborted; + + /** + * how many bytes were drained? + */ + private int drained = 0; + + /** + * Prepare to drain the stream. + * @param uri URI for messages + * @param requestObject http request object; needed to avoid GC issues. + * @param inner stream to close. + * @param shouldAbort force an abort; used if explicitly requested. + * @param streamStatistics stats to update + * @param reason reason for stream being closed; used in messages + * @param remaining remaining bytes + */ + public SDKStreamDrainer(final String uri, + @Nullable final Closeable requestObject, + final SdkFilterInputStream inner, + final boolean shouldAbort, + final int remaining, + final S3AInputStreamStatistics streamStatistics, + final String reason) { + this.uri = uri; + this.requestObject = requestObject; + this.inner = requireNonNull(inner); + this.shouldAbort = shouldAbort; + this.remaining = remaining; + this.streamStatistics = requireNonNull(streamStatistics); + this.reason = reason; + } + + /** + * drain the stream. This method is intended to be + * used directly or asynchronously, and measures the + * duration of the operation in the stream statistics. + * @return was the stream aborted? + */ + @Override + public Boolean apply() { + try { + Boolean outcome = invokeTrackingDuration( + streamStatistics.initiateInnerStreamClose(shouldAbort), + this::drainOrAbortHttpStream); + aborted = outcome; + return outcome; + } catch (Exception e) { + thrown = e; + return aborted; + } + } + + /** + * Apply, raising any exception. + * For testing. + * @return the outcome. + * @throws Exception anything raised. + */ + @VisibleForTesting + boolean applyRaisingException() throws Exception { + Boolean outcome = apply(); + if (thrown != null) { + throw thrown; + } + return outcome; + } + + /** + * Drain or abort the inner stream. + * Exceptions are saved then swallowed. + * If a close() is attempted and fails, the operation escalates to + * an abort. + */ + private boolean drainOrAbortHttpStream() { + if (executed.getAndSet(true)) { + throw new IllegalStateException( + "duplicate invocation of drain operation"); + } + boolean executeAbort = shouldAbort; Review Comment: shouldAbort is a final arg; executeAbort will be set to true if draining doesn't work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
