This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new af5ef4d [HUDI-1330] handle prefix filtering at directory level (#2157)
af5ef4d is described below
commit af5ef4d49d9c7eb610c446d8058216f39bb2fcfa
Author: Ho Tien Vu <[email protected]>
AuthorDate: Wed Oct 21 14:20:19 2020 +0800
[HUDI-1330] handle prefix filtering at directory level (#2157)
The current DFSPathSelector only ignore prefix(_, .) at the file level
while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format
exception during reading.
---
.../utilities/sources/helpers/DFSPathSelector.java | 49 ++++++++++++----------
.../sources/AbstractDFSSourceTestBase.java | 13 ++++++
2 files changed, 40 insertions(+), 22 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 6b58003..47419e0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -31,16 +31,15 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -63,7 +62,7 @@ public class DFSPathSelector {
protected final TypedProperties props;
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
- DataSourceUtils.checkRequiredProperties(props,
Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.props = props;
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP),
hadoopConf);
}
@@ -101,18 +100,8 @@ public class DFSPathSelector {
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP)
+ " source limit => " + sourceLimit);
- List<FileStatus> eligibleFiles = new ArrayList<>();
- RemoteIterator<LocatedFileStatus> fitr =
- fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)),
true);
- while (fitr.hasNext()) {
- LocatedFileStatus fileStatus = fitr.next();
- if (fileStatus.isDirectory()
- || fileStatus.getLen() == 0
- || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx ->
fileStatus.getPath().getName().startsWith(pfx))) {
- continue;
- }
- eligibleFiles.add(fileStatus);
- }
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+ List<FileStatus> eligibleFiles = listEligibleFiles(fs, new
Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
@@ -120,11 +109,6 @@ public class DFSPathSelector {
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
- if (lastCheckpointStr.isPresent() && f.getModificationTime() <=
Long.valueOf(lastCheckpointStr.get()).longValue()) {
- // skip processed files
- continue;
- }
-
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
@@ -136,7 +120,7 @@ public class DFSPathSelector {
}
// no data to read
- if (filteredFiles.size() == 0) {
+ if (filteredFiles.isEmpty()) {
return new ImmutablePair<>(Option.empty(),
lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
@@ -148,4 +132,25 @@ public class DFSPathSelector {
throw new HoodieIOException("Unable to read from source from checkpoint:
" + lastCheckpointStr, ioe);
}
}
+
+ /**
+ * List files recursively, filter out illegible files/directories while
doing so.
+ */
+ private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long
lastCheckpointTime) throws IOException {
+ // skip files/dirs whose names start with (_, ., etc)
+ FileStatus[] statuses = fs.listStatus(path, file ->
+ IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx ->
file.getName().startsWith(pfx)));
+ List<FileStatus> res = new ArrayList<>();
+ for (FileStatus status: statuses) {
+ if (status.isDirectory()) {
+ // avoid infinite loop
+ if (!status.isSymlink()) {
+ res.addAll(listEligibleFiles(fs, status.getPath(),
lastCheckpointTime));
+ }
+ } else if (status.getModificationTime() > lastCheckpointTime &&
status.getLen() > 0) {
+ res.add(status);
+ }
+ }
+ return res;
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index f63f3e9..e02d00c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -175,5 +175,18 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
InputBatch<JavaRDD<GenericRecord>> fetch5 =
sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
+
+ // 6. Should skip files/directories whose names start with prefixes ("_",
".")
+ generateOneFile(".checkpoint/3", "002", 100);
+ generateOneFile("_checkpoint/3", "002", 100);
+ generateOneFile(".3", "002", 100);
+ generateOneFile("_3", "002", 100);
+ // also work with nested directory
+ generateOneFile("foo/.bar/3", "002", 1); // not ok
+ generateOneFile("foo/bar/3", "002", 1); // ok
+ // fetch everything from the beginning
+ InputBatch<JavaRDD<GenericRecord>> fetch6 =
sourceFormatAdapter.fetchNewDataInAvroFormat(
+ Option.empty(), Long.MAX_VALUE);
+ assertEquals(10101, fetch6.getBatch().get().count());
}
}