This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 78484e23f [GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to
lookback only into datefolders that match range (#3563)
78484e23f is described below
commit 78484e23f0b5cdc5098f7438c22705db425140d2
Author: Andy Jiang <[email protected]>
AuthorDate: Thu Sep 22 16:46:11 2022 -0700
[GOBBLIN-1708] Improve TimeAwareRecursiveCopyableDataset to lookback only
into datefolders that match range (#3563)
* Check datetime range validity prior to recursing
* Remove unused packages
* Remove extra line
* Reformat function
* Check string prior to parsing
* removed unused import
* Change checkpathdatetimevalidity to use available localdatetime library
parsing functions
* Change to isempty
* Modify check path to be flexible
* Update javadoc
* Add unit tests and refactor
---
.../copy/TimeAwareRecursiveCopyableDataset.java | 31 ++++++++++++++++++
.../TimeAwareRecursiveCopyableDatasetTest.java | 37 ++++++++++++++++++++++
2 files changed, 68 insertions(+)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index fa946e475..dab563c3d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -134,9 +134,40 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1,
startDate, endDate, formatter);
}
+ /**
+ * Checks if the datePath provided is in the range of the start and end
dates.
+ * Rounds startDate and endDate to the same granularity as datePath prior to
comparing.
+ * @param startDate
+ * @param endDate
+ * @param datePath
+ * @param datePathFormat (This is the user set desired format)
+ * @param level
+ * @return true if the datePath provided is in the range of start and end
dates, inclusive.
+ */
+ public static Boolean checkPathDateTimeValidity(LocalDateTime startDate,
LocalDateTime endDate, String datePath,
+ String datePathFormat, int level) {
+ String [] datePathFormatArray = datePathFormat.split("/");
+ String datePathPattern =
String.join(FileSystems.getDefault().getSeparator(),
Arrays.asList(datePathFormatArray).subList(0, level - 1));
+ try {
+ DateTimeFormatter formatGranularity =
DateTimeFormat.forPattern(datePathPattern);
+ LocalDateTime traversedDatePathRound =
formatGranularity.parseLocalDateTime(datePath);
+ LocalDateTime startDateRound =
formatGranularity.parseLocalDateTime(startDate.toString(datePathPattern));
+ LocalDateTime endDateRound =
formatGranularity.parseLocalDateTime(endDate.toString(datePathPattern));
+ return !traversedDatePathRound.isBefore(startDateRound) &&
!traversedDatePathRound.isAfter(endDateRound);
+ } catch (IllegalArgumentException e) {
+ log.error(String.format("Cannot parse path provided %s, expected in
format of %s", datePath, datePathFormat));
+ return false;
+ }
+ }
+
private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path
path, String traversedDatePath, PathFilter fileFilter,
int level, LocalDateTime startDate, LocalDateTime endDate,
DateTimeFormatter formatter) throws IOException {
List<FileStatus> fileStatuses = Lists.newArrayList();
+ if (!traversedDatePath.isEmpty()) {
+ if (!checkPathDateTimeValidity(startDate, endDate, traversedDatePath,
this.datePattern, level)) {
+ return fileStatuses;
+ }
+ }
Iterator<FileStatus> folderIterator;
try {
if (!fs.exists(path)) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index 4f44a9909..b1b474c9b 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -335,6 +335,43 @@ public class TimeAwareRecursiveCopyableDatasetTest {
new Path("/tmp/src/ds2/daily"));
}
+ @Test
+ public void testCheckPathDateTimeValidity() {
+ String datePattern = "yyyy/MM/dd/HH";
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ LocalDateTime startDate = LocalDateTime.parse("2022/11/30/23", formatter);
+ LocalDateTime endDate = LocalDateTime.parse("2022/12/30/23", formatter);
+
+ // Level 1 is when datePath is "", that case is taken care of in the
recursivelyGetFilesAtDatePath function
+ // Check when year granularity is not in range
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2023", datePattern, 2));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2023/11", datePattern, 3));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2023/11/30", datePattern, 4));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2023/11/30/20", datePattern, 5));
+
+ // Check when hour granularity is not in range
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022", datePattern, 2));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/11", datePattern, 3));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/11/30", datePattern, 4));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/11/30/20", datePattern, 5));
+
+ // Change format and check that all granularities are in range
+ datePattern = "yyyy/MM/dd/HH/mm";
+ formatter = DateTimeFormat.forPattern(datePattern);
+ startDate = LocalDateTime.parse("2022/11/30/23/59", formatter);
+ endDate = LocalDateTime.parse("2022/12/30/23/59", formatter);
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022", datePattern, 2));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/12", datePattern, 3));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/12/15", datePattern, 4));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/12/15/15", datePattern, 5));
+
Assert.assertTrue(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "2022/12/15/15/30", datePattern, 6));
+
+ // Check when invalid datePath provided when compared against datePattern
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "test", datePattern, 2));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "test/test", datePattern, 3));
+
Assert.assertFalse(TimeAwareRecursiveCopyableDataset.checkPathDateTimeValidity(startDate,
endDate, "test/test/test", datePattern, 4));
+ }
+
@AfterClass
public void clean() throws IOException {
//Delete tmp directories