This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bdbbfe18c [GOBBLIN-2005] Use correct root path when finding relative
paths on destination side (#3882)
bdbbfe18c is described below
commit bdbbfe18cff0acee7833f148843cd371eaca8bdd
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Feb 23 15:37:45 2024 -0800
[GOBBLIN-2005] Use correct root path when finding relative paths on
destination side (#3882)
---
.../copy/UnixTimestampRecursiveCopyableDataset.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
index cd4f18867..366249b30 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -41,6 +41,8 @@ import org.joda.time.format.PeriodFormatterBuilder;
import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.AndPathFilter;
@@ -88,22 +90,23 @@ public class UnixTimestampRecursiveCopyableDataset extends
RecursiveCopyableData
* based on {@link #timestampPattern} and filters out the paths that are out
the date range
*
*/
+ @AllArgsConstructor
class TimestampPathFilter implements PathFilter {
+ private final Path path;
@Override
public boolean accept(Path path) {
LocalDate endDate = currentTime.toLocalDate();
LocalDate startDate = endDate.minus(lookbackPeriod);
- Path relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path),
datasetRoot());
+ Path relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path),
this.path);
Matcher matcher = timestampPattern.matcher(relativePath.toString());
if (!matcher.matches()) {
return false;
}
Long timestamp = Long.parseLong(matcher.group(1));
LocalDate dateOfTimestamp = new LocalDateTime(timestamp,
dateTimeZone).toLocalDate();
- return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) ||
dateOfTimestamp.isEqual(startDate)
- || dateOfTimestamp.isBefore(startDate));
+ return !(dateOfTimestamp.isAfter(endDate) ||
dateOfTimestamp.isEqual(startDate) || dateOfTimestamp.isBefore(startDate));
}
}
@@ -111,8 +114,8 @@ public class UnixTimestampRecursiveCopyableDataset extends
RecursiveCopyableData
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter)
throws IOException {
- // Filter files by lookback period (fileNames >= startDate and fileNames
<= endDate)
- PathFilter andPathFilter = new AndPathFilter(fileFilter, new
TimestampPathFilter());
+ // Filter files by lookback period (fileNames >= startDate and fileNames <
endDate)
+ PathFilter andPathFilter = new AndPathFilter(fileFilter, new
TimestampPathFilter(path));
List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
@@ -122,7 +125,7 @@ public class UnixTimestampRecursiveCopyableDataset extends
RecursiveCopyableData
Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>>
pathTimestampFilesMap = new HashMap<>();
// Now select files per day based on version selection policy
for (FileStatus fileStatus : files) {
- String relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()),
datasetRoot()).toString();
+ String relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()),
path).toString();
Matcher matcher = timestampPattern.matcher(relativePath);
if (!matcher.matches()) {
continue;