Repository: incubator-gobblin Updated Branches: refs/heads/master 749b5bd6a -> ef438c872
[GOBBLIN-573] Add option to use finer level granularity at the hour level for TimeAwareDatasetfinder Closes #2438 from sv2000/hourly Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef438c87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef438c87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef438c87 Branch: refs/heads/master Commit: ef438c872625704b39014741a110f54901c7dfab Parents: 749b5bd Author: sv2000 <[email protected]> Authored: Fri Aug 31 09:08:48 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Aug 31 09:08:48 2018 -0700 ---------------------------------------------------------------------- .../copy/TimeAwareRecursiveCopyableDataset.java | 71 ++++--- .../management/copy/DateRangeIteratorTest.java | 25 +-- .../TimeAwareRecursiveCopyableDatasetTest.java | 189 +++++++++++++++++++ 3 files changed, 251 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java index 91b5bb4..41e7ae1 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java @@ -17,31 +17,47 @@ package org.apache.gobblin.data.management.copy; -import com.google.common.collect.Lists; import java.io.IOException; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.Iterator; import java.util.List; import java.util.Properties; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.joda.time.LocalDateTime; +import org.joda.time.Period; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.testng.Assert; +import com.google.common.collect.Lists; public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset { private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive"; public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern"; - public static final String LOOKBACK_DAYS_KEY = CONFIG_PREFIX + ".lookback.days"; + public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + ".lookback.time"; - private final Integer lookbackDays; + private final String lookbackTime; private final String datePattern; + private final Period lookbackPeriod; + private final boolean isPatternHourly; public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) { super(fs, rootPath, properties, glob); - this.lookbackDays = Integer.parseInt(properties.getProperty(LOOKBACK_DAYS_KEY)); + this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY); + PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter(); + this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime); this.datePattern = properties.getProperty(DATE_PATTERN_KEY); + this.isPatternHourly = isDatePatternHourly(datePattern); + + //Daily directories cannot have a "hourly" lookback pattern. But hourly directories can accept lookback pattern with days. + if (!this.isPatternHourly) { + Assert.assertTrue(isLookbackTimeStringDaily(this.lookbackTime), "Expected day format for lookback time; found hourly format"); + } } public static class DateRangeIterator implements Iterator { @@ -49,19 +65,10 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset private LocalDateTime endDate; private boolean isDatePatternHourly; - public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, String datePattern) { + public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, boolean isDatePatternHourly) { this.startDate = startDate; this.endDate = endDate; - this.isDatePatternHourly = isDatePatternHourly(datePattern); - } - - private boolean isDatePatternHourly(String datePattern) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern); - LocalDateTime refDateTime = LocalDateTime.of(2017, 01, 01, 10, 0, 0); - String refDateTimeString = refDateTime.format(formatter); - LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHour(0); - String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.format(formatter); - return !refDateTimeString.equals(refDateTimeStringAtStartOfDay); + this.isDatePatternHourly = isDatePatternHourly; } @Override @@ -72,7 +79,7 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset @Override public LocalDateTime next() { LocalDateTime dateTime = startDate; - startDate = isDatePatternHourly ? startDate.plusHours(1) : startDate.plusDays(1); + startDate = this.isDatePatternHourly ? startDate.plusHours(1) : startDate.plusDays(1); return dateTime; } @@ -82,15 +89,35 @@ public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset } } + private boolean isDatePatternHourly(String datePattern) { + DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern); + LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0); + String refDateTimeString = refDateTime.toString(formatter); + LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0); + String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter); + return !refDateTimeString.equals(refDateTimeStringAtStartOfDay); + } + + private boolean isLookbackTimeStringDaily(String lookbackTime) { + PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter(); + try { + periodFormatter.parsePeriod(lookbackTime); + return true; + } catch (Exception e) { + return false; + } + } + @Override protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern); + DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern); LocalDateTime endDate = LocalDateTime.now(); - LocalDateTime startDate = endDate.minusDays(lookbackDays); - DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, datePattern); + LocalDateTime startDate = endDate.minus(this.lookbackPeriod); + + DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, isPatternHourly); List<FileStatus> fileStatuses = Lists.newArrayList(); while (dateRangeIterator.hasNext()) { - Path pathWithDateTime = new Path(path, dateRangeIterator.next().format(formatter)); + Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter)); fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter)); } return fileStatuses; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java index 49d5408..3c3b312 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/DateRangeIteratorTest.java @@ -17,8 +17,9 @@ package org.apache.gobblin.data.management.copy; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; +import org.joda.time.LocalDateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import org.testng.Assert; import org.testng.annotations.Test; @@ -29,28 +30,28 @@ public class DateRangeIteratorTest { @Test public void testIterator() { - LocalDateTime endDate = LocalDateTime.of(2017, 1, 1, 0, 0, 0); + LocalDateTime endDate = new LocalDateTime(2017, 1, 1, 0, 0, 0); LocalDateTime startDate = endDate.minusHours(2); String datePattern = "HH/yyyy/MM/dd"; - DateTimeFormatter format = DateTimeFormatter.ofPattern(datePattern); + DateTimeFormatter format = DateTimeFormat.forPattern(datePattern); TimeAwareRecursiveCopyableDataset.DateRangeIterator dateRangeIterator = - new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern); + new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, true); LocalDateTime dateTime = dateRangeIterator.next(); - Assert.assertEquals(dateTime.format(format), "22/2016/12/31"); + Assert.assertEquals(dateTime.toString(format), "22/2016/12/31"); dateTime = dateRangeIterator.next(); - Assert.assertEquals(dateTime.format(format), "23/2016/12/31"); + Assert.assertEquals(dateTime.toString(format), "23/2016/12/31"); dateTime = dateRangeIterator.next(); - Assert.assertEquals(dateTime.format(format), "00/2017/01/01"); + Assert.assertEquals(dateTime.toString(format), "00/2017/01/01"); Assert.assertEquals(dateRangeIterator.hasNext(), false); datePattern = "yyyy/MM/dd"; - format = DateTimeFormatter.ofPattern(datePattern); + format = DateTimeFormat.forPattern(datePattern); startDate = endDate.minusDays(1); - dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, datePattern); + dateRangeIterator = new TimeAwareRecursiveCopyableDataset.DateRangeIterator(startDate, endDate, false); dateTime = dateRangeIterator.next(); - Assert.assertEquals(dateTime.format(format), "2016/12/31"); + Assert.assertEquals(dateTime.toString(format), "2016/12/31"); dateTime = dateRangeIterator.next(); - Assert.assertEquals(dateTime.format(format), "2017/01/01"); + Assert.assertEquals(dateTime.toString(format), "2017/01/01"); Assert.assertEquals(dateRangeIterator.hasNext(), false); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef438c87/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java new file mode 100644 index 0000000..f2ed2cb --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.joda.time.LocalDateTime; +import org.joda.time.Period; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.apache.gobblin.util.PathUtils; +import org.apache.gobblin.util.filters.HiddenFilter; + +public class TimeAwareRecursiveCopyableDatasetTest { + private FileSystem fs; + private Path baseDir1; + private Path baseDir2; + + private static final String NUM_LOOKBACK_DAYS_STR = "2d"; + private static final Integer NUM_LOOKBACK_DAYS = 2; + private static final String NUM_LOOKBACK_HOURS_STR = "4h"; + private static final Integer NUM_LOOKBACK_HOURS = 4; + private static final Integer MAX_NUM_DAILY_DIRS = 4; + private static final Integer MAX_NUM_HOURLY_DIRS = 48; + private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h"; + private static final Integer NUM_DAYS_HOURS_DIRS = 25; + + @BeforeClass + public void setUp() throws IOException { + Assert.assertTrue(NUM_LOOKBACK_DAYS < MAX_NUM_DAILY_DIRS); + Assert.assertTrue(NUM_LOOKBACK_HOURS < MAX_NUM_HOURLY_DIRS); + + this.fs = FileSystem.getLocal(new Configuration()); + + baseDir1 = new Path("/tmp/src/ds1/hourly"); + if (fs.exists(baseDir1)) { + fs.delete(baseDir1, true); + } + fs.mkdirs(baseDir1); + + baseDir2 = new Path("/tmp/src/ds1/daily"); + if (fs.exists(baseDir2)) { + fs.delete(baseDir2, true); + } + fs.mkdirs(baseDir2); + PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter(); + Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR); + } + + @Test + public void testGetFilesAtPath() throws IOException { + String datePattern = "yyyy/MM/dd/HH"; + DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern); + + LocalDateTime endDate = LocalDateTime.now(); + + Set<String> candidateFiles = new HashSet<>(); + for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) { + String startDate = endDate.minusHours(i).toString(formatter); + Path subDirPath = new Path(baseDir1, new Path(startDate)); + fs.mkdirs(subDirPath); + Path filePath = new Path(subDirPath, i + ".avro"); + fs.create(filePath); + if (i < (NUM_LOOKBACK_HOURS + 1)) { + candidateFiles.add(filePath.toString()); + } + } + + //Lookback time = "4h" + Properties properties = new Properties(); + properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_STR); + properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH"); + + PathFilter pathFilter = new HiddenFilter(); + TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties, + new Path("/tmp/src/*/hourly")); + List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter); + + Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_HOURS + 1); + + for (FileStatus fileStatus: fileStatusList) { + Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString())); + } + + //Lookback time = "1d1h" + properties = new Properties(); + properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR); + properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH"); + dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties, + new Path("/tmp/src/*/hourly")); + fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter); + candidateFiles = new HashSet<>(); + datePattern = "yyyy/MM/dd/HH"; + formatter = DateTimeFormat.forPattern(datePattern); + + for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) { + String startDate = endDate.minusHours(i).toString(formatter); + Path subDirPath = new Path(baseDir1, new Path(startDate)); + Path filePath = new Path(subDirPath, i + ".avro"); + if (i < NUM_DAYS_HOURS_DIRS + 1) { + candidateFiles.add(filePath.toString()); + } + } + + Assert.assertEquals(fileStatusList.size(), NUM_DAYS_HOURS_DIRS + 1); + for (FileStatus fileStatus: fileStatusList) { + Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString())); + } + + //Lookback time = "2d" + datePattern = "yyyy/MM/dd"; + formatter = DateTimeFormat.forPattern(datePattern); + endDate = LocalDateTime.now(); + + candidateFiles = new HashSet<>(); + for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) { + String startDate = endDate.minusDays(i).toString(formatter); + Path subDirPath = new Path(baseDir2, new Path(startDate)); + fs.mkdirs(subDirPath); + Path filePath = new Path(subDirPath, i + ".avro"); + fs.create(filePath); + if (i < (NUM_LOOKBACK_DAYS + 1)) { + candidateFiles.add(filePath.toString()); + } + } + + properties = new Properties(); + properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_STR); + properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd"); + + dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties, + new Path("/tmp/src/*/daily")); + fileStatusList = dataset.getFilesAtPath(fs, baseDir2, pathFilter); + + Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1); + for (FileStatus fileStatus: fileStatusList) { + Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString())); + } + } + + @Test (expectedExceptions = AssertionError.class) + public void testInstantiationError() { + //Daily directories, but look back time has days and hours. We should expect an assertion error. + Properties properties = new Properties(); + properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR); + properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd"); + + TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties, + new Path("/tmp/src/*/daily")); + } + + @AfterClass + public void clean() throws IOException { + //Delete tmp directories + this.fs.delete(baseDir1, true); + this.fs.delete(baseDir2, true); + } +} \ No newline at end of file
