This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5b8af8c [GOBBLIN-888] Make yyyy-MM-dd-HH-mm recognizable in
TimeAwareRecursiveCopyableDataset
5b8af8c is described below
commit 5b8af8cf6e932e8031ef060c79260a4a8f47974c
Author: welin <[email protected]>
AuthorDate: Wed Oct 2 16:24:46 2019 -0700
[GOBBLIN-888] Make yyyy-MM-dd-HH-mm recognizable in
TimeAwareRecursiveCopyableDataset
Closes #2744 from linweihs/time2
---
.../copy/TimeAwareRecursiveCopyableDataset.java | 66 ++++++++++++++++++----
.../management/copy/DateRangeIteratorTest.java | 14 ++++-
.../TimeAwareRecursiveCopyableDatasetTest.java | 60 +++++++++++++++++++-
3 files changed, 127 insertions(+), 13 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index deda014..56f772a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.copy;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -49,35 +50,49 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
+ private final boolean isPatternDaily;
private final boolean isPatternHourly;
+ private final boolean isPatternMinutely;
private final LocalDateTime currentTime;
+ private final DatePattern pattern;
+
+ enum DatePattern {
+ MINUTELY, HOURLY, DAILY
+ }
public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath,
Properties properties, Path glob) {
super(fs, rootPath, properties, glob);
this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
- this.isPatternHourly = isDatePatternHourly(datePattern);
+ this.isPatternMinutely = isDatePatternMinutely(datePattern);
+ this.isPatternHourly = !this.isPatternMinutely &&
isDatePatternHourly(datePattern);
+ this.isPatternDaily = !this.isPatternMinutely && !this.isPatternHourly;
this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ?
LocalDateTime.now(
DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
: LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
- //Daily directories cannot have a "hourly" lookback pattern. But hourly
directories can accept lookback pattern with days.
- if (!this.isPatternHourly) {
- Assert.assertTrue(isLookbackTimeStringDaily(this.lookbackTime),
"Expected day format for lookback time; found hourly format");
+ if (this.isPatternDaily) {
+
Preconditions.checkArgument(isLookbackTimeStringDaily(this.lookbackTime),
"Expected day format for lookback time; found hourly or minutely format");
+ pattern = DatePattern.DAILY;
+ } else if (this.isPatternHourly) {
+
Preconditions.checkArgument(isLookbackTimeStringHourly(this.lookbackTime),
"Expected hourly format for lookback time; found minutely format");
+ pattern = DatePattern.HOURLY;
+ } else {
+ pattern = DatePattern.MINUTELY;
}
}
public static class DateRangeIterator implements Iterator {
private LocalDateTime startDate;
private LocalDateTime endDate;
- private boolean isDatePatternHourly;
+ private DatePattern datePattern;
- public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate,
boolean isDatePatternHourly) {
+ public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate,
DatePattern datePattern) {
this.startDate = startDate;
this.endDate = endDate;
- this.isDatePatternHourly = isDatePatternHourly;
+ this.datePattern = datePattern;
}
@Override
@@ -88,7 +103,19 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
@Override
public LocalDateTime next() {
LocalDateTime dateTime = startDate;
- startDate = this.isDatePatternHourly ? startDate.plusHours(1) :
startDate.plusDays(1);
+
+ switch (datePattern) {
+ case MINUTELY:
+ startDate = startDate.plusMinutes(1);
+ break;
+ case HOURLY:
+ startDate = startDate.plusHours(1);
+ break;
+ case DAILY:
+ startDate = startDate.plusDays(1);
+ break;
+ }
+
return dateTime;
}
@@ -107,6 +134,15 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
}
+ private boolean isDatePatternMinutely(String datePattern) {
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ String refDateTimeString = refDateTime.toString(formatter);
+ LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
+ String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
+ return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
+ }
+
private boolean isLookbackTimeStringDaily(String lookbackTime) {
PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
try {
@@ -117,13 +153,23 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
}
}
+ private boolean isLookbackTimeStringHourly(String lookbackTime) {
+ PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ try {
+ periodFormatter.parsePeriod(lookbackTime);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, isPatternHourly);
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
while (dateRangeIterator.hasNext()) {
Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
index 3c3b312..47e72d6 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
@@ -35,7 +35,7 @@ public class DateRangeIteratorTest {
String datePattern = "HH/yyyy/MM/dd";
DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
- new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate,
endDate, true);
+ new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate,
endDate, TimeAwareRecursiveCopyableDataset.DatePattern.HOURLY);
LocalDateTime dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
dateTime = dateRangeIterator.next();
@@ -47,11 +47,21 @@ public class DateRangeIteratorTest {
datePattern = "yyyy/MM/dd";
format = DateTimeFormat.forPattern(datePattern);
startDate = endDate.minusDays(1);
- dateRangeIterator = new
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, false);
+ dateRangeIterator = new
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate,
TimeAwareRecursiveCopyableDataset.DatePattern.DAILY);
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.toString(format), "2016/12/31");
dateTime = dateRangeIterator.next();
Assert.assertEquals(dateTime.toString(format), "2017/01/01");
Assert.assertEquals(dateRangeIterator.hasNext(), false);
+
+ datePattern = "yyyy-MM-dd-HH-mm";
+ format = DateTimeFormat.forPattern(datePattern);
+ startDate = endDate.minusHours(1);
+ dateRangeIterator = new
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate,
TimeAwareRecursiveCopyableDataset.DatePattern.MINUTELY);
+ dateTime = dateRangeIterator.next();
+ Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-00");
+ dateTime = dateRangeIterator.next();
+ Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-01");
+ Assert.assertEquals(dateRangeIterator.hasNext(), true);
}
}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index 1ceb906..b684936 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -43,10 +44,12 @@ import org.testng.annotations.Test;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
+
public class TimeAwareRecursiveCopyableDatasetTest {
private FileSystem fs;
private Path baseDir1;
private Path baseDir2;
+ private Path baseDir3;
private static final String NUM_LOOKBACK_DAYS_STR = "2d";
private static final Integer NUM_LOOKBACK_DAYS = 2;
@@ -56,6 +59,7 @@ public class TimeAwareRecursiveCopyableDatasetTest {
private static final Integer MAX_NUM_HOURLY_DIRS = 48;
private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
private static final Integer NUM_DAYS_HOURS_DIRS = 25;
+ private static final String NUM_LOOKBACK_HOURS_MINS_STR = "1h1m";
@BeforeClass
public void setUp() throws IOException {
@@ -75,6 +79,12 @@ public class TimeAwareRecursiveCopyableDatasetTest {
fs.delete(baseDir2, true);
}
fs.mkdirs(baseDir2);
+
+ baseDir3 = new Path("/tmp/src/ds2/daily");
+ if (fs.exists(baseDir3)) {
+ fs.delete(baseDir3, true);
+ }
+ fs.mkdirs(baseDir3);
PeriodFormatter formatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
}
@@ -168,9 +178,47 @@ public class TimeAwareRecursiveCopyableDatasetTest {
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
+
+ // test ds of daily/yyyy-MM-dd-HH-mm
+ datePattern = "yyyy-MM-dd-HH-mm";
+ formatter = DateTimeFormat.forPattern(datePattern);
+ endDate =
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+ Random random = new Random();
+
+ candidateFiles = new HashSet<>();
+ for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+ String startDate =
endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).toString(formatter);
+ if (i == 0) {
+ // avoid future dates on minutes, so have consistency test result
+ startDate =
endDate.minusHours(i).withMinuteOfHour(0).toString(formatter);
+ }
+ Path subDirPath = new Path(baseDir3, new Path(startDate));
+ fs.mkdirs(subDirPath);
+ Path filePath = new Path(subDirPath, i + ".avro");
+ fs.create(filePath);
+ if (i < (NUM_LOOKBACK_DAYS + 1)) {
+ candidateFiles.add(filePath.toString());
+ }
+ }
+
+ properties = new Properties();
+
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY,
"2d1h");
+ properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY,
"yyyy-MM-dd-HH-mm");
+
+ dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
+ new Path("/tmp/src/ds2/daily"));
+
+ fileStatusList = dataset.getFilesAtPath(fs, baseDir3, pathFilter);
+
+ Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
+ for (FileStatus fileStatus: fileStatusList) {
+
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+ }
+
}
- @Test (expectedExceptions = AssertionError.class)
+ @Test (expectedExceptions = IllegalArgumentException.class)
public void testInstantiationError() {
//Daily directories, but look back time has days and hours. We should
expect an assertion error.
Properties properties = new Properties();
@@ -179,6 +227,15 @@ public class TimeAwareRecursiveCopyableDatasetTest {
TimeAwareRecursiveCopyableDataset dataset = new
TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
new Path("/tmp/src/*/daily"));
+
+ // hourly directories, but look back time has hours and minutes. We should
expect an assertion error.
+ properties = new Properties();
+
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY,
NUM_LOOKBACK_HOURS_MINS_STR);
+ properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY,
"yyyy-MM-dd-HH");
+
+ dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
+ new Path("/tmp/src/ds2/daily"));
+
}
@AfterClass
@@ -186,5 +243,6 @@ public class TimeAwareRecursiveCopyableDatasetTest {
//Delete tmp directories
this.fs.delete(baseDir1, true);
this.fs.delete(baseDir2, true);
+ this.fs.delete(baseDir3, true);
}
}
\ No newline at end of file