Copilot commented on code in PR #6526:
URL: https://github.com/apache/hive/pull/6526#discussion_r3357284982
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
Review Comment:
The parallel-listing path is enabled/disabled based on only
`FileInputFormat.INPUT_DIR_RECURSIVE` and the FileSystem of `dirs[0]`. This can
mis-detect recursive mode when `mapred.input.dir.recursive` is set (common in
Hive/Tez), and it can also incorrectly take the parallel blob-storage path when
later input dirs are on a non-blob filesystem.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
+
+ long start = System.currentTimeMillis();
+ // List as the caller's end-user, not the pool threads' login user;
FileSystem.get is UGI-keyed.
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int numThreads = Math.max(2, HiveConf.getIntVar(job,
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+ ExecutorService pool = newWorkerPool(numThreads);
+ CompletionService<List<FileStatus>> completionService = new
ExecutorCompletionService<>(pool);
+
Review Comment:
`numThreads` is forced to at least 2 via `Math.max(2, ...)`, which prevents
users from disabling parallelism by setting
`hive.compute.splits.num.threads=1`. It also doesn't cap the pool size to the
number of input dirs, causing unnecessary thread creation when `dirs.length` is
small.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
+
+ long start = System.currentTimeMillis();
+ // List as the caller's end-user, not the pool threads' login user;
FileSystem.get is UGI-keyed.
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int numThreads = Math.max(2, HiveConf.getIntVar(job,
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+ ExecutorService pool = newWorkerPool(numThreads);
+ CompletionService<List<FileStatus>> completionService = new
ExecutorCompletionService<>(pool);
+
+ List<FileStatus> files = new ArrayList<>();
+ try {
+ for (Path dir : dirs) {
+ completionService.submit(() -> ugi.doAs(
+ (PrivilegedExceptionAction<List<FileStatus>>) () -> {
+ FileSystem dirFs = dir.getFileSystem(job);
+ List<FileStatus> dirFiles = new ArrayList<>();
+ FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true,
0, 0, 0, dir), dirFiles);
Review Comment:
The recursive listing is started from a synthetic `FileStatus` that is
always marked as a directory (`new FileStatus(..., true, ..., dir)`). If an
input path is actually a file (or a glob resolves to a file), this can cause
incorrect behavior or errors on filesystems that don't allow `listStatus` on
files. Using the real `FileStatus` preserves correct file-vs-directory handling.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
Review Comment:
`listStatus(JobConf)` now has substantial new behavior (parallel recursive
listing gated on blob storage + recursion + multiple dirs), but there are no
unit tests covering the new branch. Adding tests would help ensure it preserves
FileInputFormat semantics (e.g., file vs dir inputs, recursion gating, and blob
vs non-blob behavior).
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
+
+ long start = System.currentTimeMillis();
+ // List as the caller's end-user, not the pool threads' login user;
FileSystem.get is UGI-keyed.
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int numThreads = Math.max(2, HiveConf.getIntVar(job,
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+ ExecutorService pool = newWorkerPool(numThreads);
+ CompletionService<List<FileStatus>> completionService = new
ExecutorCompletionService<>(pool);
+
+ List<FileStatus> files = new ArrayList<>();
+ try {
+ for (Path dir : dirs) {
+ completionService.submit(() -> ugi.doAs(
+ (PrivilegedExceptionAction<List<FileStatus>>) () -> {
+ FileSystem dirFs = dir.getFileSystem(job);
+ List<FileStatus> dirFiles = new ArrayList<>();
+ FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true,
0, 0, 0, dir), dirFiles);
+ return dirFiles;
+ }));
+ }
+ for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) {
+ files.addAll(completionService.take().get());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while listing input directories", e);
+
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException("Failed to list input directories", cause);
+ } finally {
Review Comment:
If a worker task is interrupted (e.g., due to `shutdownNow()` during error
handling), `ExecutionException.getCause()` can be an `InterruptedException`.
Currently this path wraps it in an `IOException` without restoring the
interrupt flag on the calling thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]