rajdchak commented on code in PR #7214: URL: https://github.com/apache/hadoop/pull/7214#discussion_r1916876051
########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStream.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.impl.LeakReporter; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3AReadOpContext; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; + +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.StringUtils.toLowerCase; + +/** + * A stream of data from an S3 object. + * The blase class includes common methods, stores + * common data and incorporates leak tracking. + */ +public abstract class ObjectInputStream extends FSInputStream + implements StreamCapabilities, IOStatisticsSource { + + private static final Logger LOG = + LoggerFactory.getLogger(ObjectInputStream.class); + + /** + * IOStatistics report. + */ + private final IOStatistics ioStatistics; + + /** + * Read-specific operation context structure. + */ + private final S3AReadOpContext context; + + /** + * Callbacks for reading input stream data from the S3 Store. + */ + private final ObjectInputStreamCallbacks callbacks; + + /** + * Thread pool used for vectored IO operation. + */ + private final ExecutorService boundedThreadPool; + + /** + * URI of path. + */ + private final String uri; + + /** + * Store bucket. + */ + private final String bucket; + + /** + * Store key. + */ + private final String key; + + /** + * Path URI as a string. + */ + private final String pathStr; + + /** + * Content length from HEAD or openFile option. + */ + private final long contentLength; + + private final S3ObjectAttributes objectAttributes; + + /** + * Stream statistics. + */ + private final S3AInputStreamStatistics streamStatistics; + + /** Aggregator used to aggregate per thread IOStatistics. */ + private final IOStatisticsAggregator threadIOStatistics; + + /** + * Report of leaks. + * with report and abort unclosed streams in finalize(). + */ + private final LeakReporter leakReporter; + + /** + * Requested input policy. + */ + private S3AInputPolicy inputPolicy; + + /** + * Constructor. + * @param parameters extensible parameter list. + */ + protected ObjectInputStream( + ObjectReadParameters parameters) { + + objectAttributes = parameters.getObjectAttributes(); + checkArgument(isNotEmpty(objectAttributes.getBucket()), + "No Bucket"); + checkArgument(isNotEmpty(objectAttributes.getKey()), "No Key"); + long l = objectAttributes.getLen(); + checkArgument(l >= 0, "Negative content length"); + this.context = parameters.getContext(); + this.contentLength = l; + + this.bucket = objectAttributes.getBucket(); + this.key = objectAttributes.getKey(); + this.pathStr = objectAttributes.getPath().toString(); + this.callbacks = parameters.getCallbacks(); + this.uri = "s3a://" + bucket + "/" + key; + this.streamStatistics = parameters.getStreamStatistics(); + this.ioStatistics = streamStatistics.getIOStatistics(); + this.inputPolicy = context.getInputPolicy(); + streamStatistics.inputPolicySet(inputPolicy.ordinal()); + this.boundedThreadPool = parameters.getBoundedThreadPool(); Review Comment: I see boundedThreadPool is used in S3AInputStream but not in S3APrefetchingInputStream, can we keep boundedThreadPool local to S3AInputStream? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
