This is an automated email from the ASF dual-hosted git repository.
FrankChen021 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a207a6857e0 fix: Use FileSystem on getPaths() instead of mapreduce.Job
(#19418)
a207a6857e0 is described below
commit a207a6857e0a0ba579f60501b1913bb575df812a
Author: Jiang Wu <[email protected]>
AuthorDate: Fri May 15 13:24:56 2026 +0200
fix: Use FileSystem on getPaths() instead of mapreduce.Job (#19418)
* fix: Use FileSystem on getPaths() instead of mapreduce.Job
* fix: Use FileSystem on getPaths() instead of mapreduce.Job
---
docs/ingestion/input-sources.md | 2 +-
.../druid/inputsource/hdfs/HdfsInputSource.java | 77 ++++---
.../inputsource/hdfs/HdfsInputSourceTest.java | 228 +++++++++++++++++++++
3 files changed, 273 insertions(+), 34 deletions(-)
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index a74e61db568..f2d4ce3afec 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -564,7 +564,7 @@ Sample specs:
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `hdfs`.|None|yes|
-|paths|HDFS paths. Can be either a JSON array or comma-separated string of
paths. Wildcards like `*` are supported in these paths. Empty files located
under one of the given paths will be skipped.|None|yes|
+|paths|HDFS paths. Can be either a JSON array or comma-separated string of
paths. Wildcards like `*` are supported in these paths.<br /><br />Empty files
located under one of the given paths will be skipped. Hidden files and
directories whose names start with `_` or `.` are automatically excluded.<br
/><br />When a path points to a directory, only the immediate files in that
directory are listed; subdirectories are not traversed. To ingest files from
nested directories, use glob patterns [...]
|systemFields|JSON array of system fields to return as part of input rows.
Possible values: `__file_uri` (URI) and `__file_path` (path component of
URI).|None|no|
You can also ingest from other storage using the HDFS input source if the HDFS
client supports that storage.
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index 464021736e6..1ec082fb906 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -48,13 +48,9 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -66,6 +62,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -147,45 +144,59 @@ public class HdfsInputSource
}
}
+ /**
+ * Matches Hadoop's FileInputFormat hidden-file filter: rejects paths whose
name starts with '_' or '.'.
+ */
+ private static boolean isHiddenPath(Path path)
+ {
+ final String name = path.getName();
+ return name.startsWith("_") || name.startsWith(".");
+ }
+
public static Collection<Path> getPaths(List<String> inputPaths,
Configuration configuration) throws IOException
{
if (inputPaths.isEmpty()) {
return Collections.emptySet();
}
- // Use FileInputFormat to read splits. To do this, we need to make a fake
Job.
- Job job = Job.getInstance(configuration);
-
- // Add paths to the fake JobContext.
- for (String inputPath : inputPaths) {
- FileInputFormat.addInputPaths(job, inputPath);
+ final Set<Path> paths = new LinkedHashSet<>();
+ for (final String inputPath : inputPaths) {
+ final String[] splitPaths =
org.apache.hadoop.util.StringUtils.split(inputPath);
+ for (final String singlePath : splitPaths) {
+ final Path p = new Path(singlePath);
+ final FileSystem fs = p.getFileSystem(configuration);
+ final FileStatus[] statuses = fs.globStatus(p);
+ if (statuses != null) {
+ for (final FileStatus status : statuses) {
+ if (isHiddenPath(status.getPath())) {
+ continue;
+ }
+ if (status.isDirectory()) {
+ addFilesFromDirectory(fs, status.getPath(), paths);
+ } else if (status.getLen() > 0) {
+ paths.add(status.getPath());
+ }
+ }
+ }
+ }
}
-
- return new HdfsFileInputFormat().getSplits(job)
- .stream()
- .filter(split -> ((FileSplit)
split).getLength() > 0)
- .map(split -> ((FileSplit)
split).getPath())
- .collect(Collectors.toSet());
+ return paths;
}
/**
- * Helper for leveraging hadoop code to interpret HDFS paths with globs
+ * Lists files in a directory non-recursively, matching the behavior of
Hadoop's FileInputFormat
+ * when mapreduce.input.fileinputformat.input.dir.recursive is not set (the
default).
+ * Hidden files (names starting with '_' or '.') and subdirectories are
skipped.
*/
- private static class HdfsFileInputFormat extends FileInputFormat<Object,
Object>
+ private static void addFilesFromDirectory(FileSystem fs, Path dir, Set<Path>
paths) throws IOException
{
- @Override
- public RecordReader<Object, Object> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext
- )
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename)
- {
- return false; // prevent generating extra paths
+ final FileStatus[] children = fs.listStatus(dir);
+ if (children != null) {
+ for (final FileStatus child : children) {
+ if (!child.isDirectory() && !isHiddenPath(child.getPath()) &&
child.getLen() > 0) {
+ paths.add(child.getPath());
+ }
+ }
}
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index 53575a248f7..13ea067b862 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -65,6 +65,7 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -490,6 +491,233 @@ public class HdfsInputSourceTest extends
InitializedNullHandlingTest
}
}
+ public static class GetPathsTest
+ {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private FileSystem fileSystem;
+ private Configuration configuration;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final File dir = temporaryFolder.getRoot();
+ configuration = new Configuration(true);
+ fileSystem = new LocalFileSystem();
+ fileSystem.initialize(dir.toURI(), configuration);
+ fileSystem.setWorkingDirectory(new Path(dir.getAbsolutePath()));
+ }
+
+ @After
+ public void teardown() throws IOException
+ {
+ fileSystem.close();
+ }
+
+ @Test
+ public void testGetPathsWithGlobMatchingNoFiles() throws IOException
+ {
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(fileSystem.getWorkingDirectory() +
"/nonexistent*"),
+ configuration
+ );
+ Assert.assertTrue(paths.isEmpty());
+ }
+
+ @Test
+ public void testGetPathsFiltersZeroLengthFiles() throws IOException
+ {
+ // Create an empty file (zero length)
+ final Path emptyFile = new Path("empty_file");
+ fileSystem.create(emptyFile).close();
+
+ // Create a non-empty file
+ final Path nonEmptyFile = new Path("non_empty_file");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(nonEmptyFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(fileSystem.makeQualified(new
Path("*_file")).toString()),
+ configuration
+ );
+
+ Assert.assertEquals(1, paths.size());
+
Assert.assertTrue(paths.contains(fileSystem.makeQualified(nonEmptyFile)));
+ }
+
+ @Test
+ public void testGetPathsWithMultipleInputPaths() throws IOException
+ {
+ final Path fileA = new Path("groupA_1");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(fileA),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("a");
+ }
+
+ final Path fileB = new Path("groupB_1");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(fileB),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("b");
+ }
+
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Arrays.asList(
+ fileSystem.makeQualified(new Path("groupA*")).toString(),
+ fileSystem.makeQualified(new Path("groupB*")).toString()
+ ),
+ configuration
+ );
+
+ Assert.assertEquals(2, paths.size());
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
+ }
+
+ @Test
+ public void testGetPathsWithCommaSeparatedString() throws IOException
+ {
+ final Path fileA = new Path("comma_a");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(fileA),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("a");
+ }
+
+ final Path fileB = new Path("comma_b");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(fileB),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("b");
+ }
+
+ final String commaSeparated =
+ fileSystem.makeQualified(fileA) + "," +
fileSystem.makeQualified(fileB);
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(commaSeparated),
+ configuration
+ );
+
+ Assert.assertEquals(2, paths.size());
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
+ }
+
+ @Test
+ public void testGetPathsFiltersHiddenFiles() throws IOException
+ {
+ final Path visibleFile = new Path("visible");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(visibleFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ final Path dotFile = new Path(".hidden");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(dotFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ final Path underscoreFile = new Path("_metadata");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(underscoreFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(fileSystem.makeQualified(new
Path("*")).toString()),
+ configuration
+ );
+
+ Assert.assertEquals(1, paths.size());
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
+ Assert.assertFalse(paths.contains(fileSystem.makeQualified(dotFile)));
+
Assert.assertFalse(paths.contains(fileSystem.makeQualified(underscoreFile)));
+ }
+
+ @Test
+ public void testGetPathsDirectoryListsFilesNonRecursively() throws
IOException
+ {
+ final Path dir = new Path("mydir");
+ fileSystem.mkdirs(dir);
+
+ final Path fileInDir = new Path(dir, "file1");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(fileInDir),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ // Create a nested subdirectory with a file -- should NOT be included
+ final Path subDir = new Path(dir, "subdir");
+ fileSystem.mkdirs(subDir);
+
+ final Path nestedFile = new Path(subDir, "nested_file");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(nestedFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("nested");
+ }
+
+ // Create a hidden file in the directory -- should NOT be included
+ final Path hiddenInDir = new Path(dir, ".hidden_in_dir");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(hiddenInDir),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("hidden");
+ }
+
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(fileSystem.makeQualified(dir).toString()),
+ configuration
+ );
+
+ Assert.assertEquals(1, paths.size());
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileInDir)));
+ }
+
+ @Test
+ public void testGetPathsSkipsHiddenDirectories() throws IOException
+ {
+ final Path visibleDir = new Path("visible_dir");
+ fileSystem.mkdirs(visibleDir);
+
+ final Path visibleFile = new Path(visibleDir, "data");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(visibleFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("data");
+ }
+
+ final Path hiddenDir = new Path(".hidden_dir");
+ fileSystem.mkdirs(hiddenDir);
+
+ final Path hiddenFile = new Path(hiddenDir, "should_skip");
+ try (Writer writer = new BufferedWriter(
+ new OutputStreamWriter(fileSystem.create(hiddenFile),
StandardCharsets.UTF_8)
+ )) {
+ writer.write("skip");
+ }
+
+ final Collection<Path> paths = HdfsInputSource.getPaths(
+ Collections.singletonList(fileSystem.makeQualified(new
Path("*dir")).toString()),
+ configuration
+ );
+
+ Assert.assertEquals(1, paths.size());
+ Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
+ Assert.assertFalse(paths.contains(fileSystem.makeQualified(hiddenFile)));
+ }
+ }
+
public static class EqualsTest
{
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]