[ 
https://issues.apache.org/jira/browse/HADOOP-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913351#comment-17913351
 ] 

ASF GitHub Bot commented on HADOOP-19354:
-----------------------------------------

rajdchak commented on code in PR #7214:
URL: https://github.com/apache/hadoop/pull/7214#discussion_r1916876051


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStream.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.impl.LeakReporter;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
+
+/**
+ * A stream of data from an S3 object.
+ * The blase class includes common methods, stores
+ * common data and incorporates leak tracking.
+ */
+public abstract class ObjectInputStream extends FSInputStream
+    implements StreamCapabilities, IOStatisticsSource {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ObjectInputStream.class);
+
+  /**
+   * IOStatistics report.
+   */
+  private final IOStatistics ioStatistics;
+
+  /**
+   * Read-specific operation context structure.
+   */
+  private final S3AReadOpContext context;
+
+  /**
+   * Callbacks for reading input stream data from the S3 Store.
+   */
+  private final ObjectInputStreamCallbacks callbacks;
+
+  /**
+   * Thread pool used for vectored IO operation.
+   */
+  private final ExecutorService boundedThreadPool;
+
+  /**
+   * URI of path.
+   */
+  private final String uri;
+
+  /**
+   * Store bucket.
+   */
+  private final String bucket;
+
+  /**
+   * Store key.
+   */
+  private final String key;
+
+  /**
+   * Path URI as a string.
+   */
+  private final String pathStr;
+
+  /**
+   * Content length from HEAD or openFile option.
+   */
+  private final long contentLength;
+
+  private final S3ObjectAttributes objectAttributes;
+
+  /**
+   * Stream statistics.
+   */
+  private final S3AInputStreamStatistics streamStatistics;
+
+  /** Aggregator used to aggregate per thread IOStatistics. */
+  private final IOStatisticsAggregator threadIOStatistics;
+
+  /**
+   * Report of leaks.
+   * with report and abort unclosed streams in finalize().
+   */
+  private final LeakReporter leakReporter;
+
+  /**
+   * Requested input policy.
+   */
+  private S3AInputPolicy inputPolicy;
+
+  /**
+   * Constructor.
+   * @param parameters extensible parameter list.
+   */
+  protected ObjectInputStream(
+      ObjectReadParameters parameters) {
+
+    objectAttributes = parameters.getObjectAttributes();
+    checkArgument(isNotEmpty(objectAttributes.getBucket()),
+        "No Bucket");
+    checkArgument(isNotEmpty(objectAttributes.getKey()), "No Key");
+    long l = objectAttributes.getLen();
+    checkArgument(l >= 0, "Negative content length");
+    this.context = parameters.getContext();
+    this.contentLength = l;
+
+    this.bucket = objectAttributes.getBucket();
+    this.key = objectAttributes.getKey();
+    this.pathStr = objectAttributes.getPath().toString();
+    this.callbacks = parameters.getCallbacks();
+    this.uri = "s3a://" + bucket + "/" + key;
+    this.streamStatistics = parameters.getStreamStatistics();
+    this.ioStatistics = streamStatistics.getIOStatistics();
+    this.inputPolicy = context.getInputPolicy();
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+    this.boundedThreadPool = parameters.getBoundedThreadPool();

Review Comment:
   I see boundedThreadPool is used in S3AInputStream but not in 
S3APrefetchingInputStream, can we keep boundedThreadPool local to 
S3AInputStream?





> S3A: InputStreams to be created by factory under S3AStore
> ---------------------------------------------------------
>
>                 Key: HADOOP-19354
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19354
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> Migrate S3AInputStream creation into a factory pattern, push down into 
> S3AStore.
> Proposed factories
> * default: whatever this release has as default
> * classic: current S3AInputStream
> * prefetch: prefetching
> * analytics: new analytics stream
> * other: reads a classname from another prop, instantiates.
> Also proposed
> * stream to implement some stream capability to declare what they are 
> (classic, prefetch, analytics, other). 
> h2. Implementation
> All callbacks used by the stream also to call directly onto S3AStore.
> S3AFileSystem must not be invoked at all (if it is needed: PR is still not 
> ready).
> Some interface from Instrumentation will be passed to factory; this shall 
> include a way to create new per-stream 
> The factory shall implement org.apache.hadoop.service.Service; S3AStore shall 
> do same and become a subclass of CompositeService. It shall attach the 
> factory as a child, so they can follow the same lifecycle. We shall do the 
> same for anything else that gets pushed down.
> Everything related to stream creation must go from s3afs; and creation of the 
> factory itself. This must be done in S3AStore.initialize(). 
> As usual, this will complicate mocking. But the streams themselves should not 
> require changes, at least significant ones.
> Testing.
> * The huge file tests should be tuned so each of the different ones uses a 
> different stream, always.
> * use a -Dstream="factory name" to choose factory, rather than the -Dprefetch
> * if not set, whatever is in auth-keys gets picked up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to