Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 55dafd67e -> ece2858ec


[GOBBLIN-365] Add lookback days config property for CopyableGlobDatasetFinder

Closes #2238 from sv2000/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ece2858e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ece2858e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ece2858e

Branch: refs/heads/master
Commit: ece2858ec60a2616390e6c72ffdc30d7bca2a2ff
Parents: 55dafd6
Author: suvasude <[email protected]>
Authored: Thu Jan 11 12:02:11 2018 -0800
Committer: Issac Buenrostro <[email protected]>
Committed: Thu Jan 11 12:02:11 2018 -0800

----------------------------------------------------------------------
 .../TimeAwareCopyableGlobDatasetFinder.java     | 41 ++++++++
 .../copy/TimeAwareRecursiveCopyableDataset.java | 98 ++++++++++++++++++++
 .../management/copy/DateRangeIteratorTest.java  | 56 +++++++++++
 3 files changed, 195 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java
new file mode 100644
index 0000000..fb857bd
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.Properties;
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * {@link 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder}
 that returns datasets of type
+ * {@link 
org.apache.gobblin.data.management.copy.TimeAwareRecursiveCopyableDataset}.N
+ */
+public class TimeAwareCopyableGlobDatasetFinder extends 
ConfigurableGlobDatasetFinder<CopyableDataset> {
+
+  public TimeAwareCopyableGlobDatasetFinder(FileSystem fs, Properties props) {
+    super(fs, props);
+  }
+
+  @Override
+  public CopyableDataset datasetAtPath(Path path) throws IOException {
+    return new TimeAwareRecursiveCopyableDataset(this.fs, path, this.props, 
this.datasetPattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
new file mode 100644
index 0000000..91b5bb4
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+
+public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset {
+  private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + 
".recursive";
+  public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + 
".date.pattern";
+  public static final String LOOKBACK_DAYS_KEY = CONFIG_PREFIX + 
".lookback.days";
+
+  private final Integer lookbackDays;
+  private final String datePattern;
+
+  public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, 
Properties properties, Path glob) {
+    super(fs, rootPath, properties, glob);
+    this.lookbackDays = 
Integer.parseInt(properties.getProperty(LOOKBACK_DAYS_KEY));
+    this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
+  }
+
+  public static class DateRangeIterator implements Iterator {
+    private LocalDateTime startDate;
+    private LocalDateTime endDate;
+    private boolean isDatePatternHourly;
+
+    public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, 
String datePattern) {
+      this.startDate = startDate;
+      this.endDate = endDate;
+      this.isDatePatternHourly = isDatePatternHourly(datePattern);
+    }
+
+    private boolean isDatePatternHourly(String datePattern) {
+      DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
+      LocalDateTime refDateTime = LocalDateTime.of(2017, 01, 01, 10, 0, 0);
+      String refDateTimeString = refDateTime.format(formatter);
+      LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHour(0);
+      String refDateTimeStringAtStartOfDay = 
refDateTimeAtStartOfDay.format(formatter);
+      return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !startDate.isAfter(endDate);
+    }
+
+    @Override
+    public LocalDateTime next() {
+      LocalDateTime dateTime = startDate;
+      startDate = isDatePatternHourly ? startDate.plusHours(1) : 
startDate.plusDays(1);
+      return dateTime;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, 
PathFilter fileFilter) throws IOException {
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
+    LocalDateTime endDate = LocalDateTime.now();
+    LocalDateTime startDate = endDate.minusDays(lookbackDays);
+    DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, 
endDate, datePattern);
+    List<FileStatus> fileStatuses = Lists.newArrayList();
+    while (dateRangeIterator.hasNext()) {
+      Path pathWithDateTime = new Path(path, 
dateRangeIterator.next().format(formatter));
+      fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, 
fileFilter));
+    }
+    return fileStatuses;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
new file mode 100644
index 0000000..49d5408
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class DateRangeIteratorTest {
+
+  @Test
+  public void testIterator() {
+    LocalDateTime endDate = LocalDateTime.of(2017, 1, 1, 0, 0, 0);
+    LocalDateTime startDate = endDate.minusHours(2);
+    String datePattern = "HH/yyyy/MM/dd";
+    DateTimeFormatter format = DateTimeFormatter.ofPattern(datePattern);
+    TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator =
+        new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, 
endDate, datePattern);
+    LocalDateTime dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.format(format), "22/2016/12/31");
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.format(format), "23/2016/12/31");
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.format(format), "00/2017/01/01");
+    Assert.assertEquals(dateRangeIterator.hasNext(), false);
+
+    datePattern = "yyyy/MM/dd";
+    format = DateTimeFormatter.ofPattern(datePattern);
+    startDate = endDate.minusDays(1);
+    dateRangeIterator = new 
TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, 
datePattern);
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.format(format), "2016/12/31");
+    dateTime = dateRangeIterator.next();
+    Assert.assertEquals(dateTime.format(format), "2017/01/01");
+    Assert.assertEquals(dateRangeIterator.hasNext(), false);
+  }
+}
\ No newline at end of file

Reply via email to