Repository: incubator-gobblin Updated Branches: refs/heads/master 5c316d95c -> f00266f06
[GOBBLIN-631] Add option to use timezone for TimeAwareDatasetFinder Closes #2501 from amarnathkarthik/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f00266f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f00266f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f00266f0 Branch: refs/heads/master Commit: f00266f06aa0fd762ee4d93cafd6b3f84780e373 Parents: 5c316d9 Author: Karthik Amarnath <[email protected]> Authored: Thu Nov 15 14:58:27 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Nov 15 14:58:27 2018 -0800 ---------------------------------------------------------------------- .../copy/TimeAwareRecursiveCopyableDataset.java | 11 ++++++++++- .../copy/TimeAwareRecursiveCopyableDatasetTest.java | 5 +++-- 2 files changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f00266f0/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java index 41e7ae1..deda014 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.joda.time.DateTimeZone; import org.joda.time.LocalDateTime; import org.joda.time.Period; import org.joda.time.format.DateTimeFormat; @@ -36,15 +37,20 @@ import org.testng.Assert; import com.google.common.collect.Lists; +import org.apache.gobblin.configuration.ConfigurationKeys; + public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset { private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive"; public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern"; public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + ".lookback.time"; + public static final String DEFAULT_DATE_PATTERN_TIMEZONE = ConfigurationKeys.PST_TIMEZONE_NAME; + public static final String DATE_PATTERN_TIMEZONE_KEY = CONFIG_PREFIX + ".datetime.timezone"; private final String lookbackTime; private final String datePattern; private final Period lookbackPeriod; private final boolean isPatternHourly; + private final LocalDateTime currentTime; public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) { super(fs, rootPath, properties, glob); @@ -53,6 +59,9 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime); this.datePattern = properties.getProperty(DATE_PATTERN_KEY); this.isPatternHourly = isDatePatternHourly(datePattern); + this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? LocalDateTime.now( + DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY)) + : LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE)); //Daily directories cannot have a "hourly" lookback pattern. But hourly directories can accept lookback pattern with days. if (!this.isPatternHourly) { @@ -111,7 +120,7 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset @Override protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern); - LocalDateTime endDate = LocalDateTime.now(); + LocalDateTime endDate = currentTime; LocalDateTime startDate = endDate.minus(this.lookbackPeriod); DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, isPatternHourly); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f00266f0/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java index f2ed2cb..1ceb906 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.joda.time.DateTimeZone; import org.joda.time.LocalDateTime; import org.joda.time.Period; import org.joda.time.format.DateTimeFormat; @@ -83,7 +84,7 @@ public class TimeAwareRecursiveCopyableDatasetTest { String datePattern = "yyyy/MM/dd/HH"; DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern); - LocalDateTime endDate = LocalDateTime.now(); + LocalDateTime endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE)); Set<String> candidateFiles = new HashSet<>(); for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) { @@ -141,7 +142,7 @@ public class TimeAwareRecursiveCopyableDatasetTest { //Lookback time = "2d" datePattern = "yyyy/MM/dd"; formatter = DateTimeFormat.forPattern(datePattern); - endDate = LocalDateTime.now(); + endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE)); candidateFiles = new HashSet<>(); for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
