Repository: incubator-gobblin Updated Branches: refs/heads/master 3b64cf70f -> 85d724c64
Add lead time functionality to DatePartitionedRetrievers Add lead time functionality to DatePartitionedRetrievers. Directories that were created after (now() - lead time) will be ignored by the source. minor style tweaks Closes #2003 from eogren/datepartitionleadtime Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/85d724c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/85d724c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/85d724c6 Branch: refs/heads/master Commit: 85d724c64dc5ab842992a19bdfa59b3ee3d8c24f Parents: 3b64cf7 Author: Eric Ogren <[email protected]> Authored: Sat Jul 29 01:15:33 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Sat Jul 29 01:15:33 2017 -0700 ---------------------------------------------------------------------- .../source/DatePartitionedNestedRetriever.java | 6 ++- .../PartitionAwareFileRetrieverUtils.java | 55 ++++++++++++++++++++ .../source/PartitionedFileSourceBase.java | 16 ++++++ .../source/RegexBasedPartitionedRetriever.java | 16 ++++-- .../RegexBasedPartitionedRetrieverTest.java | 26 ++++++++- .../DatePartitionedAvroFileExtractorTest.java | 43 +++++++++++++-- .../java/gobblin/util/DatePartitionType.java | 19 +++++-- .../gobblin/util/DatePartitionTypeTest.java | 29 +++++++++++ 8 files changed, 194 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java index 0dc2f9c..c0bc1dc 100644 --- a/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java +++ b/gobblin-core/src/main/java/gobblin/source/DatePartitionedNestedRetriever.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Duration; import org.joda.time.DurationFieldType; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -46,6 +47,7 @@ import gobblin.util.DatePartitionType; import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN; + /** * PartitionRetriever that is optimized for nested directory structures where data is dumped on a regular basis * and most data has likely been processed by Gobblin already. @@ -69,6 +71,7 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev private FileSystem fs; private HadoopFsHelper helper; private final String expectedExtension; + private Duration leadTimeDuration; public DatePartitionedNestedRetriever(String expectedExtension) { this.expectedExtension = expectedExtension; @@ -86,13 +89,14 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev this.sourcePartitionSuffix = state.getProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX, StringUtils.EMPTY); this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY)); + this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state); this.helper = new HadoopFsHelper(state); } @Override public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn) throws IOException { - DateTime currentDay = new DateTime(); + DateTime currentDay = new DateTime().minus(leadTimeDuration); DateTime lowWaterMarkDate = new DateTime(minWatermark); List<FileInfo> filesToProcess = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java new file mode 100644 index 0000000..926267d --- /dev/null +++ b/gobblin-core/src/main/java/gobblin/source/PartitionAwareFileRetrieverUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source; + +import org.joda.time.Duration; + +import gobblin.configuration.State; +import gobblin.util.DatePartitionType; + +import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME; +import static gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY; +import static gobblin.source.PartitionedFileSourceBase.DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY; +import static gobblin.source.PartitionedFileSourceBase.DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME; + + +/** + * Utility functions for parsing configuration parameters commonly used by {@link PartitionAwareFileRetriever} + * objects. + */ +public class PartitionAwareFileRetrieverUtils { + /** + * Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME granularity config settings. + */ + public static Duration getLeadTimeDurationFromConfig(State state) { + String leadTimeProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME); + if (leadTimeProp == null || leadTimeProp.length() == 0) { + return DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME; + } + + int leadTime = Integer.parseInt(leadTimeProp); + + DatePartitionType leadTimeGranularity = DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY; + + String leadTimeGranularityProp = state.getProp(DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY); + if (leadTimeGranularityProp != null) { + leadTimeGranularity = DatePartitionType.valueOf(leadTimeGranularityProp); + } + + return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java index 041538c..de26450 100644 --- a/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java +++ b/gobblin-core/src/main/java/gobblin/source/PartitionedFileSourceBase.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.joda.time.Duration; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; @@ -77,6 +78,21 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS DatePartitionType.HOUR; /** + * The partition 'lead time' allows a job to ignore a date partition for a given amount of time. + * For example, if the lead_time is set to 1 day and the job is run on Jul 1 2017 at 2am, the job + * will only process partitions from Jun 30 2017 at 2am and before. + */ + public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = + DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.size"; + public static final Duration DEFAULT_PARTITIONED_SOURCE_PARTITION_LEAD_TIME = new Duration(0); + + public static final String DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY = + DATE_PARTITIONED_SOURCE_PREFIX + ".partition.lead_time.granularity"; + + public static final DatePartitionType DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY = + DEFAULT_DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY; + + /** * A String of the format defined by {@link #DATE_PARTITIONED_SOURCE_PARTITION_PATTERN} or * {@link #DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY}. For example for yyyy/MM/dd the * 2015/01/01 corresponds to January 1st, 2015. The job will start reading data from this point in time. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java index 9f43b6c..8ad8341 100644 --- a/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java +++ b/gobblin-core/src/main/java/gobblin/source/RegexBasedPartitionedRetriever.java @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.joda.time.DateTime; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev private HadoopFsHelper helper; private Path sourceDir; private final String expectedExtension; + private Duration leadTime; public RegexBasedPartitionedRetriever(String expectedExtension) { this.expectedExtension = expectedExtension; @@ -57,6 +60,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN ); + this.leadTime = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state); this.pattern = Pattern.compile(regexPattern); this.helper = new HadoopFsHelper(state); this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY)); @@ -87,12 +91,14 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev throws IOException { // This implementation assumes snapshots are always in the root directory and the number of them // remains relatively small + long maxAllowedWatermark = new DateTime().minus(leadTime).getMillis(); + try { this.helper.connect(); FileSystem fs = helper.getFileSystem(); List<FileInfo> filesToProcess = new ArrayList<>(); - List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark); + List<FileInfo> outerDirectories = getOuterDirectories(fs, minWatermark, maxAllowedWatermark); for (FileInfo outerDirectory: outerDirectories) { FileStatus[] files = fs.listStatus( new Path(outerDirectory.getFilePath()), @@ -117,7 +123,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev } } - private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark) throws IOException { + private List<FileInfo> getOuterDirectories(FileSystem fs, long minWatermark, long maxAllowedWatermark) throws IOException { LOGGER.debug("Listing contents of {}", sourceDir); FileStatus[] fileStatus = fs.listStatus(sourceDir); @@ -133,7 +139,7 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev long watermark = getWatermarkFromString( extractWatermarkFromDirectory(file.getPath().getName()) ); - if (watermark > minWatermark) { + if (watermark > minWatermark && watermark < maxAllowedWatermark) { LOGGER.info("Processing directory {} with watermark {}", file.getPath(), watermark); @@ -143,8 +149,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev watermark )); } else { - LOGGER.info("Ignoring directory {} - watermark {} is less than minWatermark {}", file.getPath(), watermark, - minWatermark); + LOGGER.info("Ignoring directory {} - watermark {} is not between minWatermark {} and (now-leadTime) {}", + file.getPath(), watermark, minWatermark, maxAllowedWatermark); } } catch (IllegalArgumentException e) { LOGGER.info("Directory {} ({}) does not match pattern {}; skipping", file.getPath().getName(), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java index 9a01b3a..208a5ae 100644 --- a/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java +++ b/gobblin-core/src/test/java/gobblin/source/RegexBasedPartitionedRetrieverTest.java @@ -24,6 +24,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.List; +import org.joda.time.DateTime; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -37,7 +38,8 @@ public class RegexBasedPartitionedRetrieverTest { private Path tempDir; private enum DateToUse { - APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L); + APR_1_2017(1491004800000L), APR_3_2017(1491177600000L), MAY_1_2017(1493596800000L), + TWENTY_THREE_HOURS_AGO(new DateTime().minusHours(23).getMillis()); private final long value; @@ -93,11 +95,31 @@ public class RegexBasedPartitionedRetrieverTest { r.init(state); List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999); - Assert.assertEquals(files.size(), 2); + Assert.assertEquals(files.size(), 3); verifyFile(files.get(0), DateToUse.APR_3_2017.getValue()); verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue()); + verifyFile(files.get(2), DateToUse.TWENTY_THREE_HOURS_AGO.getValue()); + } + @Test + public void testLeadtime() throws IOException { + String snapshotRegex = "(\\d+)-PT-\\d+"; + RegexBasedPartitionedRetriever r = new RegexBasedPartitionedRetriever("txt"); + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///"); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, tempDir.toString()); + state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN, + snapshotRegex); + state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME_GRANULARITY, "DAY"); + state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_LEAD_TIME, "1"); + r.init(state); + + List<PartitionAwareFileRetriever.FileInfo> files = r.getFilesToProcess(DateToUse.APR_3_2017.getValue() - 1, 9999); + Assert.assertEquals(files.size(), 2); + + verifyFile(files.get(0), DateToUse.APR_3_2017.getValue()); + verifyFile(files.get(1), DateToUse.MAY_1_2017.getValue()); } private void verifyFile(PartitionAwareFileRetriever.FileInfo fileInfo, long value) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java index 577f4b8..145c522 100644 --- a/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java +++ b/gobblin-core/src/test/java/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java @@ -98,13 +98,15 @@ public class DatePartitionedAvroFileExtractorTest { this.schema = new Schema.Parser().parse(AVRO_SCHEMA); //set up datetime objects - DateTime now = new DateTime(TZ).minusHours(2); + DateTime now = new DateTime(TZ).minusHours(6); this.startDateTime = new DateTime(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), now.getHourOfDay(), 30, 0, TZ); //create records, shift their timestamp by 1 minute DateTime recordDt = startDateTime; - for (int i = 0; i < RECORD_SIZE; i++) { + recordTimestamps[0] = recordDt.getMillis(); + recordDt = recordDt.plusHours(4); + for (int i = 1; i < RECORD_SIZE; i++) { recordDt = recordDt.plusMinutes(1); recordTimestamps[i] = recordDt.getMillis(); } @@ -170,6 +172,37 @@ public class DatePartitionedAvroFileExtractorTest { } @Test + public void testReadPartitionsByMinuteWithLeadtime() throws IOException, DataRecordException { + + DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource(); + + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); + state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY); + state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2); + + state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN); + state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print( + this.startDateTime.minusMinutes(1))); + state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY); + state.setProp("date.partitioned.source.partition.prefix", PREFIX); + state.setProp("date.partitioned.source.partition.suffix", SUFFIX); + state.setProp("date.partitioned.source.partition.lead_time.size", "3"); + state.setProp("date.partitioned.source.partition.lead_time.granularity", "HOUR"); + + /* + * Since lead time is 3 hours, only the first WorkUnit (which is 6 hours old, rest are 2hrs) should get + * picked up + */ + List<WorkUnit> workunits = source.getWorkunits(state); + + Assert.assertEquals(workunits.size(), 1); + verifyWorkUnits(workunits, workunits.size()); + } + + @Test public void testWorksNoPrefix() throws IOException, DataRecordException { DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource(); @@ -195,7 +228,11 @@ public class DatePartitionedAvroFileExtractorTest { private void verifyWorkUnits(List<WorkUnit> workunits) throws IOException, DataRecordException { - for (int i = 0; i < RECORD_SIZE; i++) { + verifyWorkUnits(workunits, RECORD_SIZE); + } + + private void verifyWorkUnits(List<WorkUnit> workunits, int expectedSize) throws DataRecordException, IOException { + for (int i = 0; i < expectedSize; i++) { WorkUnit workUnit = ((MultiWorkUnit) workunits.get(i)).getWorkUnits().get(0); WorkUnitState wuState = new WorkUnitState(workunits.get(i), new State()); wuState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java index 5834914..be8be66 100644 --- a/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java +++ b/gobblin-utility/src/main/java/gobblin/util/DatePartitionType.java @@ -17,12 +17,13 @@ import java.util.Map; import org.joda.time.DateTime; import org.joda.time.DateTimeFieldType; +import org.joda.time.chrono.ISOChronology; /** * Temporal granularity types for writing ({@link gobblin.writer.partitioner.TimeBasedWriterPartitioner}) and reading * ({@link gobblin.source.DatePartitionedAvroFileSource}) date partitioned data. - * + * * @author Lorand Bendig * */ @@ -62,7 +63,7 @@ public enum DatePartitionType { /** * @param pattern full partitioning pattern - * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern. + * @return a DateTimeFieldType corresponding to the smallest temporal unit in the pattern. * E.g for yyyy/MM/dd {@link DateTimeFieldType#dayOfMonth()} */ public static DateTimeFieldType getLowestIntervalUnit(String pattern) { @@ -75,15 +76,23 @@ public enum DatePartitionType { } return intervalUnit; } - + + /** + * Get the number of milliseconds associated with a partition type. Eg + * getUnitMilliseconds() of DatePartitionType.MINUTE = 60,000. + */ + public long getUnitMilliseconds() { + return dateTimeField.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis(); + } + public DateTimeFieldType getDateTimeFieldType() { return dateTimeField; } - + public int getField(DateTime dateTime) { return dateTime.get(this.dateTimeField); } - + public String getDateTimePattern() { return dateTimePattern; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85d724c6/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java new file mode 100644 index 0000000..a46d258 --- /dev/null +++ b/gobblin-utility/src/test/java/gobblin/util/DatePartitionTypeTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.util; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class DatePartitionTypeTest { + @Test + public void testGetMillis() { + Assert.assertEquals(DatePartitionType.MINUTE.getUnitMilliseconds(), 60000L); + Assert.assertEquals(DatePartitionType.HOUR.getUnitMilliseconds(), 60 * 60 * 1000L); + } +}
