This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b8af8c  [GOBBLIN-888] Make yyyy-MM-dd-HH-mm recognizable in 
TimeAwareRecursiveCopyableDataset
5b8af8c is described below

commit 5b8af8cf6e932e8031ef060c79260a4a8f47974c
Author: welin <[email protected]>
AuthorDate: Wed Oct 2 16:24:46 2019 -0700

    [GOBBLIN-888] Make yyyy-MM-dd-HH-mm recognizable in 
TimeAwareRecursiveCopyableDataset
    
    Closes #2744 from linweihs/time2
---
 .../copy/TimeAwareRecursiveCopyableDataset.java    | 66 ++++++++++++++++++----
 .../management/copy/DateRangeIteratorTest.java     | 14 ++++-
 .../TimeAwareRecursiveCopyableDatasetTest.java     | 60 +++++++++++++++++++-
 3 files changed, 127 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index deda014..56f772a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.copy;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -49,35 +50,49 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
   private final String lookbackTime;
   private final String datePattern;
   private final Period lookbackPeriod;
+  private final boolean isPatternDaily;
   private final boolean isPatternHourly;
+  private final boolean isPatternMinutely;
   private final LocalDateTime currentTime;
+  private final DatePattern pattern;
+
+  enum DatePattern {
+    MINUTELY, HOURLY, DAILY
+  }
 
   public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, 
Properties properties, Path glob) {
     super(fs, rootPath, properties, glob);
     this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
-    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
     this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
     this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
-    this.isPatternHourly = isDatePatternHourly(datePattern);
+    this.isPatternMinutely = isDatePatternMinutely(datePattern);
+    this.isPatternHourly = !this.isPatternMinutely && 
isDatePatternHourly(datePattern);
+    this.isPatternDaily = !this.isPatternMinutely && !this.isPatternHourly;
     this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? 
LocalDateTime.now(
         DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
         : LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
 
-    //Daily directories cannot have a "hourly" lookback pattern. But hourly 
directories can accept lookback pattern with days.
-    if (!this.isPatternHourly) {
-      Assert.assertTrue(isLookbackTimeStringDaily(this.lookbackTime), 
"Expected day format for lookback time; found hourly format");
+    if (this.isPatternDaily) {
+      
Preconditions.checkArgument(isLookbackTimeStringDaily(this.lookbackTime), 
"Expected day format for lookback time; found hourly or minutely format");
+      pattern = DatePattern.DAILY;
+    } else if (this.isPatternHourly) {
+      
Preconditions.checkArgument(isLookbackTimeStringHourly(this.lookbackTime), 
"Expected hourly format for lookback time; found minutely format");
+      pattern = DatePattern.HOURLY;
+    } else {
+      pattern = DatePattern.MINUTELY;
     }
   }
 
   public static class DateRangeIterator implements Iterator {
     private LocalDateTime startDate;
     private LocalDateTime endDate;
-    private boolean isDatePatternHourly;
+    private DatePattern datePattern;
 
-    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, 
boolean isDatePatternHourly) {
+    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, 
DatePattern datePattern) {
       this.startDate = startDate;
       this.endDate = endDate;
-      this.isDatePatternHourly = isDatePatternHourly;
+      this.datePattern = datePattern;
     }
 
     @Override
@@ -88,7 +103,19 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     @Override
     public LocalDateTime next() {
       LocalDateTime dateTime = startDate;
-      startDate = this.isDatePatternHourly ? startDate.plusHours(1) : 
startDate.plusDays(1);
+
+      switch (datePattern) {
+        case MINUTELY:
+          startDate = startDate.plusMinutes(1);
+          break;
+        case HOURLY:
+          startDate = startDate.plusHours(1);
+          break;
+        case DAILY:
+          startDate = startDate.plusDays(1);
+          break;
+      }
+
       return dateTime;
     }
 
@@ -107,6 +134,15 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
   }
 
+  private boolean isDatePatternMinutely(String datePattern) {
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+    String refDateTimeString = refDateTime.toString(formatter);
+    LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
+    String refDateTimeStringAtStartOfHour = 
refDateTimeAtStartOfHour.toString(formatter);
+    return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
+  }
+
   private boolean isLookbackTimeStringDaily(String lookbackTime) {
     PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
     try {
@@ -117,13 +153,23 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     }
   }
 
+  private boolean isLookbackTimeStringHourly(String lookbackTime) {
+    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+    try {
+      periodFormatter.parsePeriod(lookbackTime);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, 
PathFilter fileFilter) throws IOException {
     DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
     LocalDateTime endDate = currentTime;
     LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
 
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, isPatternHourly);
+    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, pattern);
     List<FileStatus> fileStatuses = Lists.newArrayList();
     while (dateRangeIterator.hasNext()) {
       Path pathWithDateTime = new Path(path, 
dateRangeIterator.next().toString(formatter));
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
index 3c3b312..47e72d6 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
@@ -35,7 +35,7 @@ public class DateRangeIteratorTest {
     String datePattern = "HH/yyyy/MM/dd";
     DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
     TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
-        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, 
endDate, true);
+        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, 
endDate, TimeAwareRecursiveCopyableDataset.DatePattern.HOURLY);
     LocalDateTime dateTime = dateRangeIterator.next();
     Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
     dateTime = dateRangeIterator.next();
@@ -47,11 +47,21 @@ public class DateRangeIteratorTest {
     datePattern = "yyyy/MM/dd";
     format = DateTimeFormat.forPattern(datePattern);
     startDate = endDate.minusDays(1);
-    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, false);
+    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, 
TimeAwareRecursiveCopyableDataset.DatePattern.DAILY);
     dateTime = dateRangeIterator.next();
     Assert.assertEquals(dateTime.toString(format), "2016/12/31");
     dateTime = dateRangeIterator.next();
     Assert.assertEquals(dateTime.toString(format), "2017/01/01");
     Assert.assertEquals(dateRangeIterator.hasNext(), false);
+
+    datePattern = "yyyy-MM-dd-HH-mm";
+    format = DateTimeFormat.forPattern(datePattern);
+    startDate = endDate.minusHours(1);
+    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, 
TimeAwareRecursiveCopyableDataset.DatePattern.MINUTELY);
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-00");
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.toString(format), "2016-12-31-23-01");
+    Assert.assertEquals(dateRangeIterator.hasNext(), true);
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
index 1ceb906..b684936 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -43,10 +44,12 @@ import org.testng.annotations.Test;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.filters.HiddenFilter;
 
+
 public class TimeAwareRecursiveCopyableDatasetTest {
   private FileSystem fs;
   private Path baseDir1;
   private Path baseDir2;
+  private Path baseDir3;
 
   private static final String NUM_LOOKBACK_DAYS_STR = "2d";
   private static final Integer NUM_LOOKBACK_DAYS = 2;
@@ -56,6 +59,7 @@ public class TimeAwareRecursiveCopyableDatasetTest {
   private static final Integer MAX_NUM_HOURLY_DIRS = 48;
   private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
   private static final Integer NUM_DAYS_HOURS_DIRS = 25;
+  private static final String NUM_LOOKBACK_HOURS_MINS_STR = "1h1m";
 
   @BeforeClass
   public void setUp() throws IOException {
@@ -75,6 +79,12 @@ public class TimeAwareRecursiveCopyableDatasetTest {
       fs.delete(baseDir2, true);
     }
     fs.mkdirs(baseDir2);
+
+    baseDir3 = new Path("/tmp/src/ds2/daily");
+    if (fs.exists(baseDir3)) {
+      fs.delete(baseDir3, true);
+    }
+    fs.mkdirs(baseDir3);
     PeriodFormatter formatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
     Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
   }
@@ -168,9 +178,47 @@ public class TimeAwareRecursiveCopyableDatasetTest {
     for (FileStatus fileStatus: fileStatusList) {
       
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
     }
+
+    // test ds of daily/yyyy-MM-dd-HH-mm
+    datePattern = "yyyy-MM-dd-HH-mm";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = 
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    Random random = new Random();
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = 
endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).toString(formatter);
+      if (i == 0) {
+        // avoid future dates on minutes, so have consistency test result
+        startDate = 
endDate.minusHours(i).withMinuteOfHour(0).toString(formatter);
+      }
+      Path subDirPath = new Path(baseDir3, new Path(startDate));
+      fs.mkdirs(subDirPath);
+      Path filePath = new Path(subDirPath, i + ".avro");
+      fs.create(filePath);
+      if (i < (NUM_LOOKBACK_DAYS + 1)) {
+        candidateFiles.add(filePath.toString());
+      }
+    }
+
+    properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
"2d1h");
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy-MM-dd-HH-mm");
+
+    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
+        new Path("/tmp/src/ds2/daily"));
+
+    fileStatusList = dataset.getFilesAtPath(fs, baseDir3, pathFilter);
+
+    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
+    for (FileStatus fileStatus: fileStatusList) {
+      
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+    }
+
   }
 
-  @Test (expectedExceptions = AssertionError.class)
+  @Test (expectedExceptions = IllegalArgumentException.class)
   public void testInstantiationError() {
     //Daily directories, but look back time has days and hours. We should 
expect an assertion error.
     Properties properties = new Properties();
@@ -179,6 +227,15 @@ public class TimeAwareRecursiveCopyableDatasetTest {
 
     TimeAwareRecursiveCopyableDataset dataset = new 
TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
         new Path("/tmp/src/*/daily"));
+
+    // hourly directories, but look back time has hours and minutes. We should 
expect an assertion error.
+    properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_HOURS_MINS_STR);
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy-MM-dd-HH");
+
+    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
+        new Path("/tmp/src/ds2/daily"));
+
   }
 
   @AfterClass
@@ -186,5 +243,6 @@ public class TimeAwareRecursiveCopyableDatasetTest {
     //Delete tmp directories
     this.fs.delete(baseDir1, true);
     this.fs.delete(baseDir2, true);
+    this.fs.delete(baseDir3, true);
   }
 }
\ No newline at end of file

Reply via email to