Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5c316d95c -> f00266f06


[GOBBLIN-631] Add option to use timezone for TimeAwareDatasetFinder

Closes #2501 from amarnathkarthik/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f00266f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f00266f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f00266f0

Branch: refs/heads/master
Commit: f00266f06aa0fd762ee4d93cafd6b3f84780e373
Parents: 5c316d9
Author: Karthik Amarnath <[email protected]>
Authored: Thu Nov 15 14:58:27 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Nov 15 14:58:27 2018 -0800

----------------------------------------------------------------------
 .../copy/TimeAwareRecursiveCopyableDataset.java          | 11 ++++++++++-
 .../copy/TimeAwareRecursiveCopyableDatasetTest.java      |  5 +++--
 2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f00266f0/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 41e7ae1..deda014 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDateTime;
 import org.joda.time.Period;
 import org.joda.time.format.DateTimeFormat;
@@ -36,15 +37,20 @@ import org.testng.Assert;
 
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
 public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset {
   private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + 
".recursive";
   public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + 
".date.pattern";
   public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + 
".lookback.time";
+  public static final String DEFAULT_DATE_PATTERN_TIMEZONE = 
ConfigurationKeys.PST_TIMEZONE_NAME;
+  public static final String DATE_PATTERN_TIMEZONE_KEY = CONFIG_PREFIX + 
".datetime.timezone";
 
   private final String lookbackTime;
   private final String datePattern;
   private final Period lookbackPeriod;
   private final boolean isPatternHourly;
+  private final LocalDateTime currentTime;
 
   public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, 
Properties properties, Path glob) {
     super(fs, rootPath, properties, glob);
@@ -53,6 +59,9 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
     this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
     this.isPatternHourly = isDatePatternHourly(datePattern);
+    this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? 
LocalDateTime.now(
+        DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
+        : LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
 
     //Daily directories cannot have a "hourly" lookback pattern. But hourly 
directories can accept lookback pattern with days.
     if (!this.isPatternHourly) {
@@ -111,7 +120,7 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, 
PathFilter fileFilter) throws IOException {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
-    LocalDateTime endDate = LocalDateTime.now();
+    LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
 
     DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, isPatternHourly);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f00266f0/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index f2ed2cb..1ceb906 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDateTime;
 import org.joda.time.Period;
 import org.joda.time.format.DateTimeFormat;
@@ -83,7 +84,7 @@ public class TimeAwareRecursiveCopyableDatasetTest {
     String datePattern = "yyyy/MM/dd/HH";
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
 
-    LocalDateTime endDate = LocalDateTime.now();
+    LocalDateTime endDate = 
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
 
     Set<String> candidateFiles = new HashSet<>();
     for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
@@ -141,7 +142,7 @@ public class TimeAwareRecursiveCopyableDatasetTest {
     //Lookback time = "2d"
     datePattern = "yyyy/MM/dd";
     formatter = DateTimeFormat.forPattern(datePattern);
-    endDate = LocalDateTime.now();
+    endDate = 
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
 
     candidateFiles = new HashSet<>();
     for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {

Reply via email to