Repository: incubator-gobblin Updated Branches: refs/heads/master 55dafd67e -> ece2858ec
[GOBBLIN-365] Add lookback days config property for CopyableGlobDatasetFinder Closes #2238 from sv2000/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ece2858e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ece2858e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ece2858e Branch: refs/heads/master Commit: ece2858ec60a2616390e6c72ffdc30d7bca2a2ff Parents: 55dafd6 Author: suvasude <[email protected]> Authored: Thu Jan 11 12:02:11 2018 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Thu Jan 11 12:02:11 2018 -0800 ---------------------------------------------------------------------- .../TimeAwareCopyableGlobDatasetFinder.java | 41 ++++++++ .../copy/TimeAwareRecursiveCopyableDataset.java | 98 ++++++++++++++++++++ .../management/copy/DateRangeIteratorTest.java | 56 +++++++++++ 3 files changed, 195 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java new file mode 100644 index 0000000..fb857bd --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareCopyableGlobDatasetFinder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy; + +import java.io.IOException; +import java.util.Properties; +import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * {@link org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder} that returns datasets of type + * {@link org.apache.gobblin.data.management.copy.TimeAwareRecursiveCopyableDataset}.N + */ +public class TimeAwareCopyableGlobDatasetFinder extends ConfigurableGlobDatasetFinder<CopyableDataset> { + + public TimeAwareCopyableGlobDatasetFinder(FileSystem fs, Properties props) { + super(fs, props); + } + + @Override + public CopyableDataset datasetAtPath(Path path) throws IOException { + return new TimeAwareRecursiveCopyableDataset(this.fs, path, this.props, this.datasetPattern); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java new file mode 100644 index 0000000..91b5bb4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + + +public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset { + private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive"; + public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern"; + public static final String LOOKBACK_DAYS_KEY = CONFIG_PREFIX + ".lookback.days"; + + private final Integer lookbackDays; + private final String datePattern; + + public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) { + super(fs, rootPath, properties, glob); + this.lookbackDays = Integer.parseInt(properties.getProperty(LOOKBACK_DAYS_KEY)); + this.datePattern = properties.getProperty(DATE_PATTERN_KEY); + } + + public static class DateRangeIterator implements Iterator { + private LocalDateTime startDate; + private LocalDateTime endDate; + private boolean isDatePatternHourly; + + public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, String datePattern) { + this.startDate = startDate; + this.endDate = endDate; + this.isDatePatternHourly = isDatePatternHourly(datePattern); + } + + private boolean isDatePatternHourly(String datePattern) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern); + LocalDateTime refDateTime = LocalDateTime.of(2017, 01, 01, 10, 0, 0); + String refDateTimeString = refDateTime.format(formatter); + LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHour(0); + String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.format(formatter); + return !refDateTimeString.equals(refDateTimeStringAtStartOfDay); + } + + @Override + public boolean hasNext() { + return !startDate.isAfter(endDate); + } + + @Override + public LocalDateTime next() { + LocalDateTime dateTime = startDate; + startDate = isDatePatternHourly ? startDate.plusHours(1) : startDate.plusDays(1); + return dateTime; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern); + LocalDateTime endDate = LocalDateTime.now(); + LocalDateTime startDate = endDate.minusDays(lookbackDays); + DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, datePattern); + List<FileStatus> fileStatuses = Lists.newArrayList(); + while (dateRangeIterator.hasNext()) { + Path pathWithDateTime = new Path(path, dateRangeIterator.next().format(formatter)); + fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter)); + } + return fileStatuses; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ece2858e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java new file mode 100644 index 0000000..49d5408 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class DateRangeIteratorTest { + + @Test + public void testIterator() { + LocalDateTime endDate = LocalDateTime.of(2017, 1, 1, 0, 0, 0); + LocalDateTime startDate = endDate.minusHours(2); + String datePattern = "HH/yyyy/MM/dd"; + DateTimeFormatter format = DateTimeFormatter.ofPattern(datePattern); + TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator = + new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern); + LocalDateTime dateTime = dateRangeIterator.next(); + Assert.assertEquals(dateTime.format(format), "22/2016/12/31"); + dateTime = dateRangeIterator.next(); + Assert.assertEquals(dateTime.format(format), "23/2016/12/31"); + dateTime = dateRangeIterator.next(); + Assert.assertEquals(dateTime.format(format), "00/2017/01/01"); + Assert.assertEquals(dateRangeIterator.hasNext(), false); + + datePattern = "yyyy/MM/dd"; + format = DateTimeFormatter.ofPattern(datePattern); + startDate = endDate.minusDays(1); + dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern); + dateTime = dateRangeIterator.next(); + Assert.assertEquals(dateTime.format(format), "2016/12/31"); + dateTime = dateRangeIterator.next(); + Assert.assertEquals(dateTime.format(format), "2017/01/01"); + Assert.assertEquals(dateRangeIterator.hasNext(), false); + } +} \ No newline at end of file
