This is an automated email from the ASF dual-hosted git repository.

FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a207a6857e0 fix: Use FileSystem on getPaths() instead of mapreduce.Job 
(#19418)
a207a6857e0 is described below

commit a207a6857e0a0ba579f60501b1913bb575df812a
Author: Jiang Wu <[email protected]>
AuthorDate: Fri May 15 13:24:56 2026 +0200

    fix: Use FileSystem on getPaths() instead of mapreduce.Job (#19418)
    
    * fix: Use FileSystem on getPaths() instead of mapreduce.Job
    
    * fix: Use FileSystem on getPaths() instead of mapreduce.Job
---
 docs/ingestion/input-sources.md                    |   2 +-
 .../druid/inputsource/hdfs/HdfsInputSource.java    |  77 ++++---
 .../inputsource/hdfs/HdfsInputSourceTest.java      | 228 +++++++++++++++++++++
 3 files changed, 273 insertions(+), 34 deletions(-)

diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index a74e61db568..f2d4ce3afec 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -564,7 +564,7 @@ Sample specs:
 |Property|Description|Default|Required|
 |--------|-----------|-------|---------|
 |type|Set the value to `hdfs`.|None|yes|
-|paths|HDFS paths. Can be either a JSON array or comma-separated string of 
paths. Wildcards like `*` are supported in these paths. Empty files located 
under one of the given paths will be skipped.|None|yes|
+|paths|HDFS paths. Can be either a JSON array or comma-separated string of 
paths. Wildcards like `*` are supported in these paths.<br /><br />Empty files 
located under one of the given paths will be skipped. Hidden files and 
directories whose names start with `_` or `.` are automatically excluded.<br 
/><br />When a path points to a directory, only the immediate files in that 
directory are listed; subdirectories are not traversed. To ingest files from 
nested directories, use glob patterns [...]
 |systemFields|JSON array of system fields to return as part of input rows. 
Possible values: `__file_uri` (URI) and `__file_path` (path component of 
URI).|None|no|
 
 You can also ingest from other storage using the HDFS input source if the HDFS 
client supports that storage.
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index 464021736e6..1ec082fb906 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -48,13 +48,9 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
 import org.apache.druid.utils.Streams;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -66,6 +62,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -147,45 +144,59 @@ public class HdfsInputSource
     }
   }
 
+  /**
+   * Matches Hadoop's FileInputFormat hidden-file filter: rejects paths whose 
name starts with '_' or '.'.
+   */
+  private static boolean isHiddenPath(Path path)
+  {
+    final String name = path.getName();
+    return name.startsWith("_") || name.startsWith(".");
+  }
+
   public static Collection<Path> getPaths(List<String> inputPaths, 
Configuration configuration) throws IOException
   {
     if (inputPaths.isEmpty()) {
       return Collections.emptySet();
     }
 
-    // Use FileInputFormat to read splits. To do this, we need to make a fake 
Job.
-    Job job = Job.getInstance(configuration);
-
-    // Add paths to the fake JobContext.
-    for (String inputPath : inputPaths) {
-      FileInputFormat.addInputPaths(job, inputPath);
+    final Set<Path> paths = new LinkedHashSet<>();
+    for (final String inputPath : inputPaths) {
+      final String[] splitPaths = 
org.apache.hadoop.util.StringUtils.split(inputPath);
+      for (final String singlePath : splitPaths) {
+        final Path p = new Path(singlePath);
+        final FileSystem fs = p.getFileSystem(configuration);
+        final FileStatus[] statuses = fs.globStatus(p);
+        if (statuses != null) {
+          for (final FileStatus status : statuses) {
+            if (isHiddenPath(status.getPath())) {
+              continue;
+            }
+            if (status.isDirectory()) {
+              addFilesFromDirectory(fs, status.getPath(), paths);
+            } else if (status.getLen() > 0) {
+              paths.add(status.getPath());
+            }
+          }
+        }
+      }
     }
-
-    return new HdfsFileInputFormat().getSplits(job)
-                                    .stream()
-                                    .filter(split -> ((FileSplit) 
split).getLength() > 0)
-                                    .map(split -> ((FileSplit) 
split).getPath())
-                                    .collect(Collectors.toSet());
+    return paths;
   }
 
   /**
-   * Helper for leveraging hadoop code to interpret HDFS paths with globs
+   * Lists files in a directory non-recursively, matching the behavior of 
Hadoop's FileInputFormat
+   * when mapreduce.input.fileinputformat.input.dir.recursive is not set (the 
default).
+   * Hidden files (names starting with '_' or '.') and subdirectories are 
skipped.
    */
-  private static class HdfsFileInputFormat extends FileInputFormat<Object, 
Object>
+  private static void addFilesFromDirectory(FileSystem fs, Path dir, Set<Path> 
paths) throws IOException
   {
-    @Override
-    public RecordReader<Object, Object> createRecordReader(
-        org.apache.hadoop.mapreduce.InputSplit inputSplit,
-        TaskAttemptContext taskAttemptContext
-    )
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean isSplitable(JobContext context, Path filename)
-    {
-      return false;  // prevent generating extra paths
+    final FileStatus[] children = fs.listStatus(dir);
+    if (children != null) {
+      for (final FileStatus child : children) {
+        if (!child.isDirectory() && !isHiddenPath(child.getPath()) && 
child.getLen() > 0) {
+          paths.add(child.getPath());
+        }
+      }
     }
   }
 
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index 53575a248f7..13ea067b862 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -65,6 +65,7 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -490,6 +491,233 @@ public class HdfsInputSourceTest extends 
InitializedNullHandlingTest
     }
   }
 
+  public static class GetPathsTest
+  {
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    private FileSystem fileSystem;
+    private Configuration configuration;
+
+    @Before
+    public void setup() throws IOException
+    {
+      final File dir = temporaryFolder.getRoot();
+      configuration = new Configuration(true);
+      fileSystem = new LocalFileSystem();
+      fileSystem.initialize(dir.toURI(), configuration);
+      fileSystem.setWorkingDirectory(new Path(dir.getAbsolutePath()));
+    }
+
+    @After
+    public void teardown() throws IOException
+    {
+      fileSystem.close();
+    }
+
+    @Test
+    public void testGetPathsWithGlobMatchingNoFiles() throws IOException
+    {
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(fileSystem.getWorkingDirectory() + 
"/nonexistent*"),
+          configuration
+      );
+      Assert.assertTrue(paths.isEmpty());
+    }
+
+    @Test
+    public void testGetPathsFiltersZeroLengthFiles() throws IOException
+    {
+      // Create an empty file (zero length)
+      final Path emptyFile = new Path("empty_file");
+      fileSystem.create(emptyFile).close();
+
+      // Create a non-empty file
+      final Path nonEmptyFile = new Path("non_empty_file");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(nonEmptyFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(fileSystem.makeQualified(new 
Path("*_file")).toString()),
+          configuration
+      );
+
+      Assert.assertEquals(1, paths.size());
+      
Assert.assertTrue(paths.contains(fileSystem.makeQualified(nonEmptyFile)));
+    }
+
+    @Test
+    public void testGetPathsWithMultipleInputPaths() throws IOException
+    {
+      final Path fileA = new Path("groupA_1");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(fileA), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("a");
+      }
+
+      final Path fileB = new Path("groupB_1");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(fileB), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("b");
+      }
+
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Arrays.asList(
+              fileSystem.makeQualified(new Path("groupA*")).toString(),
+              fileSystem.makeQualified(new Path("groupB*")).toString()
+          ),
+          configuration
+      );
+
+      Assert.assertEquals(2, paths.size());
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
+    }
+
+    @Test
+    public void testGetPathsWithCommaSeparatedString() throws IOException
+    {
+      final Path fileA = new Path("comma_a");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(fileA), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("a");
+      }
+
+      final Path fileB = new Path("comma_b");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(fileB), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("b");
+      }
+
+      final String commaSeparated =
+          fileSystem.makeQualified(fileA) + "," + 
fileSystem.makeQualified(fileB);
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(commaSeparated),
+          configuration
+      );
+
+      Assert.assertEquals(2, paths.size());
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
+    }
+
+    @Test
+    public void testGetPathsFiltersHiddenFiles() throws IOException
+    {
+      final Path visibleFile = new Path("visible");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(visibleFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      final Path dotFile = new Path(".hidden");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(dotFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      final Path underscoreFile = new Path("_metadata");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(underscoreFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(fileSystem.makeQualified(new 
Path("*")).toString()),
+          configuration
+      );
+
+      Assert.assertEquals(1, paths.size());
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
+      Assert.assertFalse(paths.contains(fileSystem.makeQualified(dotFile)));
+      
Assert.assertFalse(paths.contains(fileSystem.makeQualified(underscoreFile)));
+    }
+
+    @Test
+    public void testGetPathsDirectoryListsFilesNonRecursively() throws 
IOException
+    {
+      final Path dir = new Path("mydir");
+      fileSystem.mkdirs(dir);
+
+      final Path fileInDir = new Path(dir, "file1");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(fileInDir), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      // Create a nested subdirectory with a file -- should NOT be included
+      final Path subDir = new Path(dir, "subdir");
+      fileSystem.mkdirs(subDir);
+
+      final Path nestedFile = new Path(subDir, "nested_file");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(nestedFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("nested");
+      }
+
+      // Create a hidden file in the directory -- should NOT be included
+      final Path hiddenInDir = new Path(dir, ".hidden_in_dir");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(hiddenInDir), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("hidden");
+      }
+
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(fileSystem.makeQualified(dir).toString()),
+          configuration
+      );
+
+      Assert.assertEquals(1, paths.size());
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileInDir)));
+    }
+
+    @Test
+    public void testGetPathsSkipsHiddenDirectories() throws IOException
+    {
+      final Path visibleDir = new Path("visible_dir");
+      fileSystem.mkdirs(visibleDir);
+
+      final Path visibleFile = new Path(visibleDir, "data");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(visibleFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("data");
+      }
+
+      final Path hiddenDir = new Path(".hidden_dir");
+      fileSystem.mkdirs(hiddenDir);
+
+      final Path hiddenFile = new Path(hiddenDir, "should_skip");
+      try (Writer writer = new BufferedWriter(
+          new OutputStreamWriter(fileSystem.create(hiddenFile), 
StandardCharsets.UTF_8)
+      )) {
+        writer.write("skip");
+      }
+
+      final Collection<Path> paths = HdfsInputSource.getPaths(
+          Collections.singletonList(fileSystem.makeQualified(new 
Path("*dir")).toString()),
+          configuration
+      );
+
+      Assert.assertEquals(1, paths.size());
+      Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
+      Assert.assertFalse(paths.contains(fileSystem.makeQualified(hiddenFile)));
+    }
+  }
+
   public static class EqualsTest
   {
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to