FrankChen021 commented on code in PR #19418:
URL: https://github.com/apache/druid/pull/19418#discussion_r3195588950


##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java:
##########
@@ -153,39 +150,31 @@ public static Collection<Path> getPaths(List<String> 
inputPaths, Configuration c
       return Collections.emptySet();
     }
 
-    // Use FileInputFormat to read splits. To do this, we need to make a fake 
Job.
-    Job job = Job.getInstance(configuration);
-
-    // Add paths to the fake JobContext.
-    for (String inputPath : inputPaths) {
-      FileInputFormat.addInputPaths(job, inputPath);
+    final Set<Path> paths = new LinkedHashSet<>();
+    for (final String inputPath : inputPaths) {
+      final Path p = new Path(inputPath);

Review Comment:
   [P1] Comma-separated paths string no longer works
   
   The documented HDFS `paths` property can be a comma-separated string, and 
the previous `FileInputFormat.addInputPaths(job, inputPath)` split that string 
using Hadoop's path parser. The replacement constructs one `Path` from the 
entire string, so a spec like `"hdfs://nn/a.json,hdfs://nn/b.json"` is now 
treated as a single path/glob and will fail or match nothing. Split 
comma-separated string inputs before creating `Path`s, preserving Hadoop's 
escaping/brace behavior where possible.



##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java:
##########
@@ -153,39 +150,31 @@ public static Collection<Path> getPaths(List<String> 
inputPaths, Configuration c
       return Collections.emptySet();
     }
 
-    // Use FileInputFormat to read splits. To do this, we need to make a fake 
Job.
-    Job job = Job.getInstance(configuration);
-
-    // Add paths to the fake JobContext.
-    for (String inputPath : inputPaths) {
-      FileInputFormat.addInputPaths(job, inputPath);
+    final Set<Path> paths = new LinkedHashSet<>();
+    for (final String inputPath : inputPaths) {
+      final Path p = new Path(inputPath);
+      final FileSystem fs = p.getFileSystem(configuration);
+      final FileStatus[] statuses = fs.globStatus(p);
+      if (statuses != null) {
+        for (final FileStatus status : statuses) {
+          addFilesRecursively(fs, status, paths);
+        }
+      }
     }
-
-    return new HdfsFileInputFormat().getSplits(job)
-                                    .stream()
-                                    .filter(split -> ((FileSplit) 
split).getLength() > 0)
-                                    .map(split -> ((FileSplit) 
split).getPath())
-                                    .collect(Collectors.toSet());
+    return paths;
   }
 
-  /**
-   * Helper for leveraging hadoop code to interpret HDFS paths with globs
-   */
-  private static class HdfsFileInputFormat extends FileInputFormat<Object, 
Object>
+  private static void addFilesRecursively(FileSystem fs, FileStatus status, 
Set<Path> paths) throws IOException
   {
-    @Override
-    public RecordReader<Object, Object> createRecordReader(
-        org.apache.hadoop.mapreduce.InputSplit inputSplit,
-        TaskAttemptContext taskAttemptContext
-    )
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean isSplitable(JobContext context, Path filename)
-    {
-      return false;  // prevent generating extra paths
+    if (status.isDirectory()) {
+      final FileStatus[] children = fs.listStatus(status.getPath());

Review Comment:
   [P2] FileInputFormat filtering semantics were dropped
   
   The old `FileInputFormat` listing applied Hadoop's hidden-file filter, 
excluding path names starting with `_` or `.`, and only recursed into 
directories when `mapreduce.input.fileinputformat.input.dir.recursive` was 
enabled. The new direct `listStatus` traversal always recurses and never 
filters hidden files, so existing directory inputs can start ingesting nested 
data and non-empty marker/metadata files that were previously skipped. 
Reintroduce the same path filter and recursion behavior while avoiding the 
MapReduce token path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to