Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 749b5bd6a -> ef438c872


[GOBBLIN-573] Add option to use finer level granularity at the hour level for 
TimeAwareDatasetfinder

Closes #2438 from sv2000/hourly


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef438c87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef438c87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef438c87

Branch: refs/heads/master
Commit: ef438c872625704b39014741a110f54901c7dfab
Parents: 749b5bd
Author: sv2000 <[email protected]>
Authored: Fri Aug 31 09:08:48 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Aug 31 09:08:48 2018 -0700

----------------------------------------------------------------------
 .../copy/TimeAwareRecursiveCopyableDataset.java |  71 ++++---
 .../management/copy/DateRangeIteratorTest.java  |  25 +--
 .../TimeAwareRecursiveCopyableDatasetTest.java  | 189 +++++++++++++++++++
 3 files changed, 251 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 91b5bb4..41e7ae1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -17,31 +17,47 @@
 
 package org.apache.gobblin.data.management.copy;
 
-import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.testng.Assert;
 
+import com.google.common.collect.Lists;
 
 public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset {
   private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + 
".recursive";
   public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + 
".date.pattern";
-  public static final String LOOKBACK_DAYS_KEY = CONFIG_PREFIX + 
".lookback.days";
+  public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + 
".lookback.time";
 
-  private final Integer lookbackDays;
+  private final String lookbackTime;
   private final String datePattern;
+  private final Period lookbackPeriod;
+  private final boolean isPatternHourly;
 
   public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, 
Properties properties, Path glob) {
     super(fs, rootPath, properties, glob);
-    this.lookbackDays = 
Integer.parseInt(properties.getProperty(LOOKBACK_DAYS_KEY));
+    this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
+    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+    this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
     this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
+    this.isPatternHourly = isDatePatternHourly(datePattern);
+
+    //Daily directories cannot have a "hourly" lookback pattern. But hourly 
directories can accept lookback pattern with days.
+    if (!this.isPatternHourly) {
+      Assert.assertTrue(isLookbackTimeStringDaily(this.lookbackTime), 
"Expected day format for lookback time; found hourly format");
+    }
   }
 
   public static class DateRangeIterator implements Iterator {
@@ -49,19 +65,10 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     private LocalDateTime endDate;
     private boolean isDatePatternHourly;
 
-    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, 
String datePattern) {
+    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, 
boolean isDatePatternHourly) {
       this.startDate = startDate;
       this.endDate = endDate;
-      this.isDatePatternHourly = isDatePatternHourly(datePattern);
-    }
-
-    private boolean isDatePatternHourly(String datePattern) {
-      DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
-      LocalDateTime refDateTime = LocalDateTime.of(2017, 01, 01, 10, 0, 0);
-      String refDateTimeString = refDateTime.format(formatter);
-      LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHour(0);
-      String refDateTimeStringAtStartOfDay = 
refDateTimeAtStartOfDay.format(formatter);
-      return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
+      this.isDatePatternHourly = isDatePatternHourly;
     }
 
     @Override
@@ -72,7 +79,7 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     @Override
     public LocalDateTime next() {
       LocalDateTime dateTime = startDate;
-      startDate = isDatePatternHourly ? startDate.plusHours(1) : 
startDate.plusDays(1);
+      startDate = this.isDatePatternHourly ? startDate.plusHours(1) : 
startDate.plusDays(1);
       return dateTime;
     }
 
@@ -82,15 +89,35 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     }
   }
 
+  private boolean isDatePatternHourly(String datePattern) {
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+    LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
+    String refDateTimeString = refDateTime.toString(formatter);
+    LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
+    String refDateTimeStringAtStartOfDay = 
refDateTimeAtStartOfDay.toString(formatter);
+    return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
+  }
+
+  private boolean isLookbackTimeStringDaily(String lookbackTime) {
+    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
+    try {
+      periodFormatter.parsePeriod(lookbackTime);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
   @Override
   protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, 
PathFilter fileFilter) throws IOException {
-    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
     LocalDateTime endDate = LocalDateTime.now();
-    LocalDateTime startDate = endDate.minusDays(lookbackDays);
-    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, datePattern);
+    LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
+
+    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, isPatternHourly);
     List<FileStatus> fileStatuses = Lists.newArrayList();
     while (dateRangeIterator.hasNext()) {
-      Path pathWithDateTime = new Path(path, 
dateRangeIterator.next().format(formatter));
+      Path pathWithDateTime = new Path(path, 
dateRangeIterator.next().toString(formatter));
       fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, 
fileFilter));
     }
     return fileStatuses;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
index 49d5408..3c3b312 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
@@ -17,8 +17,9 @@
 
 package org.apache.gobblin.data.management.copy;
 
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -29,28 +30,28 @@ public class DateRangeIteratorTest {
 
   @Test
   public void testIterator() {
-    LocalDateTime endDate = LocalDateTime.of(2017, 1, 1, 0, 0, 0);
+    LocalDateTime endDate = new LocalDateTime(2017, 1, 1, 0, 0, 0);
     LocalDateTime startDate = endDate.minusHours(2);
     String datePattern = "HH/yyyy/MM/dd";
-    DateTimeFormatter format = DateTimeFormatter.ofPattern(datePattern);
+    DateTimeFormatter format = DateTimeFormat.forPattern(datePattern);
     TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
-        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, 
endDate, datePattern);
+        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, 
endDate, true);
     LocalDateTime dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.format(format), "22/2016/12/31");
+    Assert.assertEquals(dateTime.toString(format), "22/2016/12/31");
     dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.format(format), "23/2016/12/31");
+    Assert.assertEquals(dateTime.toString(format), "23/2016/12/31");
     dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.format(format), "00/2017/01/01");
+    Assert.assertEquals(dateTime.toString(format), "00/2017/01/01");
     Assert.assertEquals(dateRangeIterator.hasNext(), false);
 
     datePattern = "yyyy/MM/dd";
-    format = DateTimeFormatter.ofPattern(datePattern);
+    format = DateTimeFormat.forPattern(datePattern);
     startDate = endDate.minusDays(1);
-    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, 
datePattern);
+    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, false);
     dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.format(format), "2016/12/31");
+    Assert.assertEquals(dateTime.toString(format), "2016/12/31");
     dateTime = dateRangeIterator.next();
-    Assert.assertEquals(dateTime.format(format), "2017/01/01");
+    Assert.assertEquals(dateTime.toString(format), "2017/01/01");
     Assert.assertEquals(dateRangeIterator.hasNext(), false);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
new file mode 100644
index 0000000..f2ed2cb
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+
+public class TimeAwareRecursiveCopyableDatasetTest {
+  private FileSystem fs;
+  private Path baseDir1;
+  private Path baseDir2;
+
+  private static final String NUM_LOOKBACK_DAYS_STR = "2d";
+  private static final Integer NUM_LOOKBACK_DAYS = 2;
+  private static final String NUM_LOOKBACK_HOURS_STR = "4h";
+  private static final Integer NUM_LOOKBACK_HOURS = 4;
+  private static final Integer MAX_NUM_DAILY_DIRS = 4;
+  private static final Integer MAX_NUM_HOURLY_DIRS = 48;
+  private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
+  private static final Integer NUM_DAYS_HOURS_DIRS = 25;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    Assert.assertTrue(NUM_LOOKBACK_DAYS < MAX_NUM_DAILY_DIRS);
+    Assert.assertTrue(NUM_LOOKBACK_HOURS < MAX_NUM_HOURLY_DIRS);
+
+    this.fs = FileSystem.getLocal(new Configuration());
+
+    baseDir1 = new Path("/tmp/src/ds1/hourly");
+    if (fs.exists(baseDir1)) {
+      fs.delete(baseDir1, true);
+    }
+    fs.mkdirs(baseDir1);
+
+    baseDir2 = new Path("/tmp/src/ds1/daily");
+    if (fs.exists(baseDir2)) {
+      fs.delete(baseDir2, true);
+    }
+    fs.mkdirs(baseDir2);
+    PeriodFormatter formatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+    Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
+  }
+
+  @Test
+  public void testGetFilesAtPath() throws IOException {
+    String datePattern = "yyyy/MM/dd/HH";
+    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+
+    LocalDateTime endDate = LocalDateTime.now();
+
+    Set<String> candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
+      String startDate = endDate.minusHours(i).toString(formatter);
+      Path subDirPath = new Path(baseDir1, new Path(startDate));
+      fs.mkdirs(subDirPath);
+      Path filePath = new Path(subDirPath, i + ".avro");
+      fs.create(filePath);
+      if (i < (NUM_LOOKBACK_HOURS + 1)) {
+        candidateFiles.add(filePath.toString());
+      }
+    }
+
+    //Lookback time = "4h"
+    Properties properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_HOURS_STR);
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy/MM/dd/HH");
+
+    PathFilter pathFilter = new HiddenFilter();
+    TimeAwareRecursiveCopyableDataset dataset = new 
TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
+        new Path("/tmp/src/*/hourly"));
+    List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseDir1, 
pathFilter);
+
+    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_HOURS + 1);
+
+    for (FileStatus fileStatus: fileStatusList) {
+      
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+    }
+
+    //Lookback time = "1d1h"
+    properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_DAYS_HOURS_STR);
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy/MM/dd/HH");
+    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
+        new Path("/tmp/src/*/hourly"));
+    fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);
+    candidateFiles = new HashSet<>();
+    datePattern = "yyyy/MM/dd/HH";
+    formatter = DateTimeFormat.forPattern(datePattern);
+
+    for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
+      String startDate = endDate.minusHours(i).toString(formatter);
+      Path subDirPath = new Path(baseDir1, new Path(startDate));
+      Path filePath = new Path(subDirPath, i + ".avro");
+      if (i < NUM_DAYS_HOURS_DIRS + 1) {
+        candidateFiles.add(filePath.toString());
+      }
+    }
+
+    Assert.assertEquals(fileStatusList.size(), NUM_DAYS_HOURS_DIRS + 1);
+    for (FileStatus fileStatus: fileStatusList) {
+      
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+    }
+
+    //Lookback time = "2d"
+    datePattern = "yyyy/MM/dd";
+    formatter = DateTimeFormat.forPattern(datePattern);
+    endDate = LocalDateTime.now();
+
+    candidateFiles = new HashSet<>();
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      String startDate = endDate.minusDays(i).toString(formatter);
+      Path subDirPath = new Path(baseDir2, new Path(startDate));
+      fs.mkdirs(subDirPath);
+      Path filePath = new Path(subDirPath, i + ".avro");
+      fs.create(filePath);
+      if (i < (NUM_LOOKBACK_DAYS + 1)) {
+        candidateFiles.add(filePath.toString());
+      }
+    }
+
+    properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_DAYS_STR);
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy/MM/dd");
+
+    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
+        new Path("/tmp/src/*/daily"));
+    fileStatusList = dataset.getFilesAtPath(fs, baseDir2, pathFilter);
+
+    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
+    for (FileStatus fileStatus: fileStatusList) {
+      
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
+    }
+  }
+
+  @Test (expectedExceptions = AssertionError.class)
+  public void testInstantiationError() {
+    //Daily directories, but look back time has days and hours. We should 
expect an assertion error.
+    Properties properties = new Properties();
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_DAYS_HOURS_STR);
+    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, 
"yyyy/MM/dd");
+
+    TimeAwareRecursiveCopyableDataset dataset = new 
TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
+        new Path("/tmp/src/*/daily"));
+  }
+
+  @AfterClass
+  public void clean() throws IOException {
+    //Delete tmp directories
+    this.fs.delete(baseDir1, true);
+    this.fs.delete(baseDir2, true);
+  }
+}
\ No newline at end of file

Reply via email to