Copilot commented on code in PR #6526:
URL: https://github.com/apache/hive/pull/6526#discussion_r3357284982


##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final 
ParquetInputFormat<ArrayWritable> input
     vectorizedSelf = new VectorizedParquetInputFormat();
   }
 
+  /**
+   * On blob storage with multiple recursive input directories, list them in 
parallel instead of the
+   * default serial per-directory listing that dominates split generation. 
Listed files flow through
+   * the inherited {@link FileInputFormat#getSplits} unchanged; all other 
cases defer to the default.
+   */
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    Path[] dirs = getInputPaths(job);
+    // Only the recursive case (the Tez default) takes the parallel path; 
non-recursive listing has
+    // subtler sub-directory semantics, so defer to the default.
+    if (dirs.length <= 1
+        || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+        || !BlobStorageUtils.isBlobStorageFileSystem(job, 
dirs[0].getFileSystem(job))) {
+      return super.listStatus(job);
+    }

Review Comment:
   The parallel-listing path is enabled/disabled based on only 
`FileInputFormat.INPUT_DIR_RECURSIVE` and the FileSystem of `dirs[0]`. This can 
mis-detect recursive mode when `mapred.input.dir.recursive` is set (common in 
Hive/Tez), and it can also incorrectly take the parallel blob-storage path when 
later input dirs are on a non-blob filesystem.



##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final 
ParquetInputFormat<ArrayWritable> input
     vectorizedSelf = new VectorizedParquetInputFormat();
   }
 
+  /**
+   * On blob storage with multiple recursive input directories, list them in 
parallel instead of the
+   * default serial per-directory listing that dominates split generation. 
Listed files flow through
+   * the inherited {@link FileInputFormat#getSplits} unchanged; all other 
cases defer to the default.
+   */
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    Path[] dirs = getInputPaths(job);
+    // Only the recursive case (the Tez default) takes the parallel path; 
non-recursive listing has
+    // subtler sub-directory semantics, so defer to the default.
+    if (dirs.length <= 1
+        || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+        || !BlobStorageUtils.isBlobStorageFileSystem(job, 
dirs[0].getFileSystem(job))) {
+      return super.listStatus(job);
+    }
+
+    long start = System.currentTimeMillis();
+    // List as the caller's end-user, not the pool threads' login user; 
FileSystem.get is UGI-keyed.
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int numThreads = Math.max(2, HiveConf.getIntVar(job, 
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+    ExecutorService pool = newWorkerPool(numThreads);
+    CompletionService<List<FileStatus>> completionService = new 
ExecutorCompletionService<>(pool);
+

Review Comment:
   `numThreads` is forced to at least 2 via `Math.max(2, ...)`, which prevents 
users from disabling parallelism by setting 
`hive.compute.splits.num.threads=1`. It also doesn't cap the pool size to the 
number of input dirs, causing unnecessary thread creation when `dirs.length` is 
small.



##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final 
ParquetInputFormat<ArrayWritable> input
     vectorizedSelf = new VectorizedParquetInputFormat();
   }
 
+  /**
+   * On blob storage with multiple recursive input directories, list them in 
parallel instead of the
+   * default serial per-directory listing that dominates split generation. 
Listed files flow through
+   * the inherited {@link FileInputFormat#getSplits} unchanged; all other 
cases defer to the default.
+   */
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    Path[] dirs = getInputPaths(job);
+    // Only the recursive case (the Tez default) takes the parallel path; 
non-recursive listing has
+    // subtler sub-directory semantics, so defer to the default.
+    if (dirs.length <= 1
+        || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+        || !BlobStorageUtils.isBlobStorageFileSystem(job, 
dirs[0].getFileSystem(job))) {
+      return super.listStatus(job);
+    }
+
+    long start = System.currentTimeMillis();
+    // List as the caller's end-user, not the pool threads' login user; 
FileSystem.get is UGI-keyed.
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int numThreads = Math.max(2, HiveConf.getIntVar(job, 
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+    ExecutorService pool = newWorkerPool(numThreads);
+    CompletionService<List<FileStatus>> completionService = new 
ExecutorCompletionService<>(pool);
+
+    List<FileStatus> files = new ArrayList<>();
+    try {
+      for (Path dir : dirs) {
+        completionService.submit(() -> ugi.doAs(
+            (PrivilegedExceptionAction<List<FileStatus>>) () -> {
+              FileSystem dirFs = dir.getFileSystem(job);
+              List<FileStatus> dirFiles = new ArrayList<>();
+              FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 
0, 0, 0, dir), dirFiles);

Review Comment:
   The recursive listing is started from a synthetic `FileStatus` that is 
always marked as a directory (`new FileStatus(..., true, ..., dir)`). If an 
input path is actually a file (or a glob resolves to a file), this can cause 
incorrect behavior or errors on filesystems that don't allow `listStatus` on 
files. Using the real `FileStatus` preserves correct file-vs-directory handling.



##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final 
ParquetInputFormat<ArrayWritable> input
     vectorizedSelf = new VectorizedParquetInputFormat();
   }
 
+  /**
+   * On blob storage with multiple recursive input directories, list them in 
parallel instead of the
+   * default serial per-directory listing that dominates split generation. 
Listed files flow through
+   * the inherited {@link FileInputFormat#getSplits} unchanged; all other 
cases defer to the default.
+   */
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    Path[] dirs = getInputPaths(job);

Review Comment:
   `listStatus(JobConf)` now has substantial new behavior (parallel recursive 
listing gated on blob storage + recursion + multiple dirs), but there are no 
unit tests covering the new branch. Adding tests would help ensure it preserves 
FileInputFormat semantics (e.g., file vs dir inputs, recursion gating, and blob 
vs non-blob behavior).



##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final 
ParquetInputFormat<ArrayWritable> input
     vectorizedSelf = new VectorizedParquetInputFormat();
   }
 
+  /**
+   * On blob storage with multiple recursive input directories, list them in 
parallel instead of the
+   * default serial per-directory listing that dominates split generation. 
Listed files flow through
+   * the inherited {@link FileInputFormat#getSplits} unchanged; all other 
cases defer to the default.
+   */
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    Path[] dirs = getInputPaths(job);
+    // Only the recursive case (the Tez default) takes the parallel path; 
non-recursive listing has
+    // subtler sub-directory semantics, so defer to the default.
+    if (dirs.length <= 1
+        || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+        || !BlobStorageUtils.isBlobStorageFileSystem(job, 
dirs[0].getFileSystem(job))) {
+      return super.listStatus(job);
+    }
+
+    long start = System.currentTimeMillis();
+    // List as the caller's end-user, not the pool threads' login user; 
FileSystem.get is UGI-keyed.
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int numThreads = Math.max(2, HiveConf.getIntVar(job, 
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+    ExecutorService pool = newWorkerPool(numThreads);
+    CompletionService<List<FileStatus>> completionService = new 
ExecutorCompletionService<>(pool);
+
+    List<FileStatus> files = new ArrayList<>();
+    try {
+      for (Path dir : dirs) {
+        completionService.submit(() -> ugi.doAs(
+            (PrivilegedExceptionAction<List<FileStatus>>) () -> {
+              FileSystem dirFs = dir.getFileSystem(job);
+              List<FileStatus> dirFiles = new ArrayList<>();
+              FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 
0, 0, 0, dir), dirFiles);
+              return dirFiles;
+            }));
+      }
+      for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) {
+        files.addAll(completionService.take().get());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while listing input directories", e);
+
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+      throw new IOException("Failed to list input directories", cause);
+    } finally {

Review Comment:
   If a worker task is interrupted (e.g., due to `shutdownNow()` during error 
handling), `ExecutionException.getCause()` can be an `InterruptedException`. 
Currently this path wraps it in an `IOException` without restoring the 
interrupt flag on the calling thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to