phet commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r925243204
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -49,14 +52,11 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
- private final boolean isPatternDaily;
- private final boolean isPatternHourly;
- private final boolean isPatternMinutely;
private final LocalDateTime currentTime;
- private final DatePattern pattern;
+ private final DatePattern patternQualifier;
enum DatePattern {
- MINUTELY, HOURLY, DAILY
+ SECONDLY, MINUTELY, HOURLY, DAILY
Review Comment:
I can't think of better names (and understand how these result from applying
a general rule), but I do stumble when reading these according to the common
defs, "of the second thing" and "in fine detail" (!) ;)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -108,6 +101,9 @@ public LocalDateTime next() {
LocalDateTime dateTime = startDate;
switch (datePattern) {
+ case SECONDLY:
+ startDate = startDate.plusSeconds(1);
+ break;
case MINUTELY:
startDate = startDate.plusMinutes(1);
Review Comment:
it's interesting to see how firmly tied this finder is to specific time
semantics (especially time zones). if e.g. the dir producer were not timezone
aware (or used a broken/presumed calculation) to create a date folder like
2:30am on the day the finder's TZ actually has it "springing forward" (from 2am
-> 3am) due to DST, this finder would miss that folder. probably not a big
risk, all things considered, but worth noting
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
+ // test ds of daily/yyyy-MM-dd-HH-mm-ss
+ datePattern = "yyyy-MM-dd-HH-mm-ss";
+ formatter = DateTimeFormat.forPattern(datePattern);
+ endDate =
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+ candidateFiles = new HashSet<>();
+ for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+ String startDate =
endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
Review Comment:
does using random values mean the test is not necessarily deterministic?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
}
}
- private boolean isDatePatternHourly(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay =
refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
+ private DatePattern validateLookbackWithDatePatternFormat(String
datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
+ PeriodFormatterBuilder formatterBuilder;
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ // Validate that the lookback is supported for the time format
+ if
(!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.SECONDLY;
+ } else if
(!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.MINUTELY;
+ } else if
(!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.HOURLY;
+ } else if
(!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) )
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.DAILY;
}
+ return null;
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder
formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+
+ // Data inside of nested folders representing timestamps need to be
fetched differently
+ if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+ // Use an iterator that traverses through all times from lookback to
current time, based on format
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, this.patternQualifier);
+ while (dateRangeIterator.hasNext()) {
+ Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
+ if (!fs.exists(pathWithDateTime)) {
+ continue;
+ }
+ fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+ }
+ } else {
+ // Look at the top level directories and compare if those fit into the
date format
+ Iterator<FileStatus> folderIterator =
Arrays.asList(fs.listStatus(path)).iterator();
+ while (folderIterator.hasNext()) {
+ Path folderPath = folderIterator.next().getPath();
+ String datePath =
folderPath.getName().split("/")[folderPath.getName().split("/").length -1];
+ try {
+ LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+ if (folderDate.isAfter(startDate) && folderDate.isBefore(endDate)) {
Review Comment:
the iterator based approach was `!isAfter(endDate)`, therefore wouldn't
equivalent here be "is before or equal to" `endDate`?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
}
}
- private boolean isDatePatternHourly(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay =
refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
+ private DatePattern validateLookbackWithDatePatternFormat(String
datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
+ PeriodFormatterBuilder formatterBuilder;
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ // Validate that the lookback is supported for the time format
+ if
(!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.SECONDLY;
+ } else if
(!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.MINUTELY;
+ } else if
(!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.HOURLY;
+ } else if
(!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) )
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.DAILY;
}
+ return null;
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder
formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+
+ // Data inside of nested folders representing timestamps need to be
fetched differently
+ if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+ // Use an iterator that traverses through all times from lookback to
current time, based on format
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, this.patternQualifier);
+ while (dateRangeIterator.hasNext()) {
+ Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
+ if (!fs.exists(pathWithDateTime)) {
Review Comment:
wow, that's a lot of FS fstat access: from 1440 when by-minute to 86400 when
by-second--and that's just each day! I'm sure we all agree "premature
optimization is the root of all evil"... but still I have serious concerns that
such scanning will exceed the acceptable load on the host FS. worst would be
if users default to this as a way to 'keep their options open', even when in
practice they don't require nearly such fine-grained differentiation between
the dirs they create.
(clearly my suggestions here imply a fundamental rethinking of the impl...)
first off I note that the iterator's stride is always just one. a
sequential scan of the entire space is generally unnecessary merely to test
interval membership. I expect that was the same intuition leading you to
develop the `else` case (when no dir separator detected)... I'm just suggesting
you go farther.
overall my expectation is of an infrequent/sparse hit-rate from this
probing, since most producers won't be creating a new snapshot/partition
every/most seconds or every/most minutes. if borne out, it would likely be far
more efficient (although somewhat more complex to implement) targeted directory
child listing, with filtering of the results, as opposed to constructing every
possible concrete embodiment, only to find the vast majority not actually to
exist.
if you feel such concerns are warranted, I'm happy to suggest
simple/efficient implementation for targeted dir listing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]