ahmarsuhail commented on code in PR #7214:
URL: https://github.com/apache/hadoop/pull/7214#discussion_r1925156781


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1877,100 +1859,43 @@ private FSDataInputStream executeOpen(
     fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
 
-    if (this.prefetchEnabled) {
-      Configuration configuration = getConf();
-      initLocalDirAllocatorIfNotInitialized(configuration);
-      return new FSDataInputStream(
-          new S3APrefetchingInputStream(
-              readContext.build(),
-              createObjectAttributes(path, fileStatus),
-              createInputStreamCallbacks(auditSpan),
-              inputStreamStats,
-              configuration,
-              directoryAllocator));
-    } else {
-      return new FSDataInputStream(
-          new S3AInputStream(
-              readContext.build(),
-              createObjectAttributes(path, fileStatus),
-              createInputStreamCallbacks(auditSpan),
-                  inputStreamStats,
-                  new SemaphoredDelegatingExecutor(
-                          boundedThreadPool,
-                          vectoredActiveRangeReads,
-                          true,
-                          inputStreamStats)));
-    }
-  }
-
-  /**
-   * Override point: create the callbacks for S3AInputStream.
-   * @return an implementation of the InputStreamCallbacks,
-   */
-  private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
+    // what does the stream need
+    final StreamThreadOptions requirements =
+        getStore().threadRequirements();
+
+    // calculate the permit count.
+    final int permitCount = requirements.streamThreads() +
+        (requirements.vectorSupported()
+            ? vectoredActiveRangeReads
+            : 0);
+    // create an executor which is a subset of the
+    // bounded thread pool.
+    final SemaphoredDelegatingExecutor pool = new SemaphoredDelegatingExecutor(

Review Comment:
   ok I think I get it, this is basically a way to ensure a single stream 
instance does not use up too many threads. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to