This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0700092 [GOBBLIN-1001] Implement TimePartitionGlobFinder
0700092 is described below
commit 070009231770e04aeb9a74a9d03865cb74cf4094
Author: zhchen <[email protected]>
AuthorDate: Fri Dec 20 17:20:53 2019 -0800
[GOBBLIN-1001] Implement TimePartitionGlobFinder
Closes #2846 from zxcware/comp
---
.../verify/PinotAuditCountVerifierTest.java | 14 +-
.../copy/TimeAwareRecursiveCopyableDataset.java | 4 +
.../dataset/DefaultFileSystemGlobFinder.java | 12 +-
...lobFinder.java => SimpleFileSystemDataset.java} | 57 +++--
.../dataset/TimePartitionGlobFinder.java | 268 +++++++++++++++++++++
.../java/org/apache/gobblin/time/TimeIterator.java | 75 ++++++
.../dataset/TimePartitionedGlobFinderTest.java | 208 ++++++++++++++++
7 files changed, 593 insertions(+), 45 deletions(-)
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index f9c855a..2c4372f 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.audit.AuditCountClient;
import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
/**
@@ -51,17 +52,8 @@ public class PinotAuditCountVerifierTest {
final String inputSub = "hourly";
final String outputSub = "hourly";
TestAuditCountClient client = new TestAuditCountClient();
- FileSystemDataset dataset = new FileSystemDataset() {
- @Override
- public Path datasetRoot() {
- return new Path (input + topic + inputSub + "/2017/04/03/10");
- }
-
- @Override
- public String datasetURN() {
- return input + topic + inputSub + "/2017/04/03/10";
- }
- };
+ FileSystemDataset dataset = new SimpleFileSystemDataset(
+ new Path(input + topic + inputSub + "/2017/04/03/10"));
State props = new State();
props.setProp (CompactionAuditCountVerifier.PRODUCER_TIER, PRODUCER_TIER);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 56f772a..4802d62 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -84,6 +84,10 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
}
}
+ /**
+ * TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as
{@link LocalDateTime} will not adjust time
+ * to a given time zone
+ */
public static class DateRangeIterator implements Iterator {
private LocalDateTime startDate;
private LocalDateTime endDate;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
index b3c7805..c94e8d4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
@@ -33,16 +33,6 @@ public class DefaultFileSystemGlobFinder extends
ConfigurableGlobDatasetFinder<F
}
public FileSystemDataset datasetAtPath(final Path path) throws IOException {
- return new FileSystemDataset() {
- @Override
- public Path datasetRoot() {
- return path;
- }
-
- @Override
- public String datasetURN() {
- return path.toString();
- }
- };
+ return new SimpleFileSystemDataset(path);
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
similarity index 53%
copy from
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
copy to
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
index b3c7805..338596c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
@@ -14,35 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.gobblin.data.management.dataset;
-import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import java.io.IOException;
-import java.util.Properties;
+
+import org.apache.gobblin.dataset.FileSystemDataset;
+
/**
- * A subclass of {@link ConfigurableGlobDatasetFinder} which find all the
{@link FileSystemDataset}
- * that matches a given glob pattern.
+ * A basic implementation of {@link FileSystemDataset}. It can represent a
virtual
+ * file system dataset which doesn't have a physical file/folder
*/
-public class DefaultFileSystemGlobFinder extends
ConfigurableGlobDatasetFinder<FileSystemDataset> {
- public DefaultFileSystemGlobFinder(FileSystem fs, Properties properties)
throws IOException {
- super(fs, properties);
+public class SimpleFileSystemDataset implements FileSystemDataset {
+
+ private final Path path;
+ private final boolean isVirtual;
+
+ public SimpleFileSystemDataset(Path path) {
+ this(path, false);
+ }
+
+ public SimpleFileSystemDataset(Path path, boolean isVirtual) {
+ this.path = path;
+ this.isVirtual = isVirtual;
+ }
+
+ @Override
+ public Path datasetRoot() {
+ return path;
+ }
+
+ @Override
+ public String datasetURN() {
+ return path.toString();
}
- public FileSystemDataset datasetAtPath(final Path path) throws IOException {
- return new FileSystemDataset() {
- @Override
- public Path datasetRoot() {
- return path;
- }
-
- @Override
- public String datasetURN() {
- return path.toString();
- }
- };
+ /**
+ * @return true if the dataset doesn't have a physical file/folder
+ */
+ public boolean getIsVirtual() {
+ return isVirtual;
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
new file mode 100644
index 0000000..8a707cc
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * A {@link TimePartitionGlobFinder} finds all dataset time partitions within
time window
+ * [current time - look back time, current time]. It derives an efficient
dataset partition pattern based
+ * on the time window and a supported {@value #TIME_FORMAT}.
+ *
+ * <p> If {@value #ENABLE_VIRTUAL_PARTITION} is set, it will create virtual
{@link SimpleFileSystemDataset}
+ * instances if a partition within the time window doesn't exist
+ */
+@Slf4j
+public class TimePartitionGlobFinder implements
DatasetsFinder<FileSystemDataset> {
+ private static final String CONF_PREFIX = "timePartitionGlobFinder.";
+
+ public static final String PARTITION_PREFIX = CONF_PREFIX +
"partitionPrefix";
+ public static final String TIME_FORMAT = CONF_PREFIX + "timeFormat";
+ public static final String ENABLE_VIRTUAL_PARTITION = CONF_PREFIX +
"enableVirtualPartition";
+ /**
+ * Options are enumerated in {@link
org.apache.gobblin.time.TimeIterator.Granularity}
+ */
+ public static final String GRANULARITY = CONF_PREFIX + "granularity";
+ public static final String TIME_ZONE = CONF_PREFIX + "timeZone";
+ public static final String LOOKBACK_SPEC = CONF_PREFIX + "lookbackSpec";
+
+ private static final String DEFAULT_TIME_ZONE = "America/Los_Angeles";
+
+ private static final Pattern SUPPORTED_TIME_FORMAT =
Pattern.compile("(yyyy/MM(/.*)*)|(yyyy-MM(-.*)*)");
+
+ private final String datasetPattern;
+ private final String datasetPartitionPattern;
+ private final String partitionPrefix;
+ private final DateTimeFormatter timeFormatter;
+ private final boolean enableVirtualPartition;
+
+ private final ZonedDateTime startTime;
+ private final ZonedDateTime endTime;
+ private final TimeIterator.Granularity granularity;
+
+ private final Properties props;
+ private final FileSystem fs;
+
+ public TimePartitionGlobFinder(FileSystem fs, Properties properties) {
+ this(fs, properties,
+ ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE,
DEFAULT_TIME_ZONE))));
+ }
+
+ @VisibleForTesting
+ TimePartitionGlobFinder(FileSystem fs, Properties properties, ZonedDateTime
curTime) {
+ datasetPattern =
properties.getProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY);
+ Path datasetPath = new Path(datasetPattern);
+
+ partitionPrefix = properties.getProperty(PARTITION_PREFIX, "");
+ String timeFormat = properties.getProperty(TIME_FORMAT).trim();
+ Preconditions.checkState(isTimeFormatSupported(timeFormat),
+ String.format("Unsupported time format %s, expecting %s", timeFormat,
SUPPORTED_TIME_FORMAT));
+ timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
+
+ endTime = curTime;
+ Duration lookback = Duration.parse(properties.getProperty(LOOKBACK_SPEC));
+ startTime = endTime.minus(lookback);
+ granularity =
TimeIterator.Granularity.valueOf(properties.getProperty(GRANULARITY).toUpperCase());
+
+ datasetPartitionPattern = new Path(datasetPath,
+ partitionPrefix + derivePartitionPattern(startTime, endTime,
timeFormat)).toString();
+ log.info("Dataset partition pattern is {}", datasetPartitionPattern);
+
+ enableVirtualPartition =
Boolean.valueOf(properties.getProperty(ENABLE_VIRTUAL_PARTITION, "false"));
+
+ props = properties;
+ this.fs = fs;
+ }
+
+ /**
+ * The finder supports time format matching {@link #SUPPORTED_TIME_FORMAT}
+ */
+ @VisibleForTesting
+ static boolean isTimeFormatSupported(String timeFormat) {
+ return SUPPORTED_TIME_FORMAT.matcher(timeFormat).matches();
+ }
+
+ /**
+ * Derive partition glob pattern from time format. It tries its best to
provide
+ * a fine pattern by refining year and month options from reasoning
+ * start time, end time and {@link #SUPPORTED_TIME_FORMAT}
+ */
+ @VisibleForTesting
+ static String derivePartitionPattern(ZonedDateTime start,
+ ZonedDateTime end, String timeFormat) {
+ // Refine year options
+ int startYear = start.getYear();
+ int endYear = end.getYear();
+ StringBuilder yearOptions = new StringBuilder("{" + startYear);
+ appendOptions(yearOptions, startYear + 1, endYear);
+ yearOptions.append("}");
+
+ // Get month options
+ StringBuilder monthOptions = buildMonthOptions(start, end);
+
+ StringBuilder pattern = new StringBuilder(yearOptions);
+ if (timeFormat.contains("-")) {
+ pattern.append("-");
+ pattern.append(monthOptions);
+ //
+ if (!monthOptions.toString().equals("*")) {
+ pattern.append("*");
+ }
+ } else {
+ pattern.append("/");
+ pattern.append(monthOptions);
+ String[] parts = timeFormat.split("/");
+ // We already processed year and month components
+ for (int i = 2; i < parts.length; i++) {
+ pattern.append("/*");
+ }
+ }
+
+ return pattern.toString();
+ }
+
+ /**
+ * Refine month options
+ */
+ private static StringBuilder buildMonthOptions(ZonedDateTime start,
+ ZonedDateTime end) {
+ int startMonth = start.getMonthValue();
+ int endMonth = end.getMonthValue();
+ int yearDiff = end.getYear() - start.getYear();
+ if ( yearDiff > 1 || (yearDiff == 1 && endMonth >= startMonth)) {
+ // All 12 months
+ return new StringBuilder("*");
+ }
+ StringBuilder monthOptions = new StringBuilder("{" + startMonth);
+ if (endMonth >= startMonth) {
+ appendOptions(monthOptions, startMonth + 1, endMonth);
+ } else {
+ // from [startMonth + 1, 12] of start year
+ appendOptions(monthOptions, startMonth + 1, 12);
+ // from [1, endMonth] of current year
+ appendOptions(monthOptions, 1, endMonth);
+ }
+ monthOptions.append("}");
+ return monthOptions;
+ }
+
+ private static void appendOptions(StringBuilder stringBuilder, int start,
int end) {
+ for (int i = start; i <= end; i++) {
+ stringBuilder.append(",");
+ if (i < 10) {
+ stringBuilder.append("0");
+ }
+ stringBuilder.append(i);
+ }
+ }
+
+ @Override
+ public List<FileSystemDataset> findDatasets()
+ throws IOException {
+ try {
+ return doFindDatasets();
+ } finally {
+ // Recover ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY
config
+
this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
datasetPattern);
+ }
+ }
+
+ private List<FileSystemDataset> doFindDatasets() throws IOException {
+ // Find datasets
+ List<FileSystemDataset> datasets = findDatasets(datasetPattern);
+
+ // Compute partitions in theory based on startTime and endTime
+ Set<String> computedPartitions = new HashSet<>();
+ datasets.forEach(dataset ->
computedPartitions.addAll(computePartitions(dataset)));
+
+ // This is the final result
+ List<FileSystemDataset> resultPartitions = new
ArrayList<>(computedPartitions.size());
+
+ // Find all physical dataset time partitions
+ List<FileSystemDataset> actualPartitions =
findDatasets(datasetPartitionPattern);
+
+ String pathStr;
+ for (FileSystemDataset physicalPartition : actualPartitions) {
+ pathStr = physicalPartition.datasetRoot().toString();
+ if (computedPartitions.contains(pathStr)) {
+ resultPartitions.add(physicalPartition);
+ computedPartitions.remove(pathStr);
+ }
+ }
+
+ // Create virtual ones;
+ if (enableVirtualPartition) {
+ computedPartitions.forEach(partition -> {
+ log.info("Creating virtual partition {}", partition);
+ resultPartitions.add(new SimpleFileSystemDataset(new Path(partition),
true));
+ });
+ } else {
+ log.info("Will not create virtual partitions");
+ }
+
+ return resultPartitions;
+ }
+
+ private Collection<String> computePartitions(FileSystemDataset dataset) {
+ List<String> partitions = new ArrayList<>();
+ TimeIterator iterator = new TimeIterator(startTime, endTime, granularity);
+ while (iterator.hasNext()) {
+ partitions.add(new Path(dataset.datasetRoot(),
+ partitionPrefix + timeFormatter.format(iterator.next())).toString());
+ }
+ return partitions;
+ }
+
+ private List<FileSystemDataset> findDatasets(String pattern)
+ throws IOException {
+
this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
pattern);
+ DefaultFileSystemGlobFinder datasetFinder = new
DefaultFileSystemGlobFinder(this.fs, this.props);
+ return datasetFinder.findDatasets();
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return PathUtils.deepestNonGlobPath(new Path(this.datasetPattern));
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
new file mode 100644
index 0000000..4630a7a
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.time;
+
+import java.time.ZonedDateTime;
+import java.util.Iterator;
+
+
+/**
+ * A {@link TimeIterator} iterates over time points within [{@code startTime},
{@code endTime}]. It
+ * supports time points in various granularities (See {@link Granularity}
+ */
+public class TimeIterator implements Iterator {
+
+ public enum Granularity {
+ MINUTE, HOUR, DAY, MONTH
+ }
+
+ private ZonedDateTime startTime;
+ private ZonedDateTime endTime;
+ private Granularity granularity;
+
+ public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime,
Granularity granularity) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.granularity = granularity;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !startTime.isAfter(endTime);
+ }
+
+ @Override
+ public ZonedDateTime next() {
+ ZonedDateTime dateTime = startTime;
+
+ switch (granularity) {
+ case MINUTE:
+ startTime = startTime.plusMinutes(1);
+ break;
+ case HOUR:
+ startTime = startTime.plusHours(1);
+ break;
+ case DAY:
+ startTime = startTime.plusDays(1);
+ break;
+ case MONTH:
+ startTime = startTime.plusMonths(1);
+ break;
+ }
+
+ return dateTime;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
new file mode 100644
index 0000000..fca8bb3
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.dataset.FileSystemDataset;
+
+
+public class TimePartitionedGlobFinderTest {
+
+ private Path testRootDir;
+ private FileSystem localFs;
+ private ZoneId zone;
+ private ZonedDateTime curTime;
+
+ @BeforeClass
+ private void setup()
+ throws IOException {
+ localFs = FileSystem.getLocal(new Configuration());
+ testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+ getClass().getSimpleName());
+ if (localFs.exists(testRootDir)) {
+ localFs.delete(testRootDir, true);
+ }
+ localFs.mkdirs(testRootDir);
+ localFs.deleteOnExit(testRootDir);
+ zone = ZoneId.of("America/Los_Angeles");
+ LocalDateTime localTime = LocalDateTime.of(2019,1,1,0,0);
+ curTime = ZonedDateTime.of(localTime, zone);
+ }
+
+ @Test
+ public void testDayPartitions()
+ throws IOException {
+ String hourlyFormat = "yyyy/MM/dd/HH";
+ String hourlyPrefix = "hourly/";
+ String dayFormat = "yyyy/MM/dd";
+
+ // create an empty dataset /db1/table1/hourly
+ Path ds1 = createDatasetPath("db1/table1");
+ // create dataset /db2/table2/hourly
+ Path ds2 = createDatasetPath("db2/table2");
+ createPartitions(ds1, hourlyPrefix,0, 2, hourlyFormat);
+ createPartitions(ds2, hourlyPrefix,-1, 2, hourlyFormat);
+
+ String datasetPattern = new Path(testRootDir, "*/*").toString();
+
+ // Test glob finder without creating empty partition
+ Properties props = new Properties();
+ props.setProperty("gobblin.dataset.pattern", datasetPattern);
+ props.setProperty("timePartitionGlobFinder.partitionPrefix", hourlyPrefix);
+ props.setProperty("timePartitionGlobFinder.timeFormat", dayFormat);
+ props.setProperty("timePartitionGlobFinder.lookbackSpec", "P2D");
+ props.setProperty("timePartitionGlobFinder.granularity", "DAY");
+ TimePartitionGlobFinder finder = new TimePartitionGlobFinder(localFs,
props, curTime);
+ List<FileSystemDataset> datasets = finder.findDatasets();
+ Assert.assertEquals(datasets.size(), 2);
+ // Verify there are 2 day partitions for /db2/table2
+ Assert.assertNotNull(find(getPartitionPath(ds1, hourlyPrefix, 0,
dayFormat), datasets));
+ Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, -1,
dayFormat), datasets));
+
+ // Test glob finder with creating empty partition
+ props.setProperty("timePartitionGlobFinder.enableVirtualPartition",
"true");
+ finder = new TimePartitionGlobFinder(localFs, props, curTime);
+ datasets = finder.findDatasets();
+ Assert.assertEquals(datasets.size(), 6);
+ // Verify virtual partitions for /db1/table1
+ Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat),
datasets).getIsVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat),
datasets).getIsVirtual());
+ // Verify virtual partitions for /db2/table2
+ Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat),
datasets).getIsVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat),
datasets).getIsVirtual());
+ }
+
+ private Path getPartitionPath(Path dataset, String prefix, int dayOffset,
String format) {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
+ return new Path(dataset, prefix +
formatter.format(curTime.plusDays(dayOffset)));
+ }
+
+ private SimpleFileSystemDataset find(Path path, List<FileSystemDataset>
list) {
+ for (FileSystemDataset dataset : list) {
+ if (dataset.datasetRoot().equals(path)) {
+ return (SimpleFileSystemDataset)dataset;
+ }
+ }
+ return null;
+ }
+
+ private Path createDatasetPath(String dataset)
+ throws IOException {
+ Path datasetPath = new Path(testRootDir, dataset);
+ localFs.mkdirs(datasetPath);
+ return datasetPath;
+ }
+
+ private void createPartitions(Path dataset, String prefix, int dayOffset,
int hours, String format)
+ throws IOException {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
+ ZonedDateTime dayTime = curTime.plusDays(dayOffset);
+ for (int i = 0; i < hours; i++) {
+ Path hourPath = new Path(formatter.format(dayTime.withHour(i)));
+ Path datasetPartitionPath = new Path(dataset, prefix + hourPath);
+ Path dataFile = new Path(datasetPartitionPath, "dataFile");
+ try (OutputStream outputStream = localFs.create(dataFile, true)) {
+ outputStream.write(i);
+ }
+ }
+ }
+
+ @Test
+ public void testDerivePartitionPattern() {
+ String slashTimeFormat = "yyyy/MM/dd";
+ String dashTimeFormat = "yyyy-MM-dd";
+
+ // 2019/12/1 - 2019/12/3
+ LocalDateTime localTime = LocalDateTime.of(2019,12,3,0,0);
+ ZonedDateTime end = ZonedDateTime.of(localTime, zone);
+ ZonedDateTime start = end.withDayOfMonth(1);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2019}/{12}/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2019}-{12}*");
+
+ // 2019/11/30 - 2019/12/3
+ start = end.withMonth(11).withDayOfMonth(30);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2019}/{11,12}/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2019}-{11,12}*");
+
+ // 2018/12/1 - 2019/12/3
+ start = end.withYear(2018).withMonth(12).withDayOfMonth(1);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2018,2019}/*/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2018,2019}-*");
+
+ // 2018/11/30 - 2019/12/3
+ start = end.withYear(2018).withMonth(11).withDayOfMonth(30);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2018,2019}/*/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2018,2019}-*");
+
+ // 2018/11/30 - 2019/01/3
+ end = end.withMonth(1);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2018,2019}/{11,12,01}/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2018,2019}-{11,12,01}*");
+
+ // Test hourly
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, "yyyy/MM/dd/HH"),
+ "{2018,2019}/{11,12,01}/*/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, "yyyy-MM-dd-HH"),
+ "{2018,2019}-{11,12,01}*");
+ }
+
+ @Test
+ public void testSupportedTimeFormat() {
+
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd/HH"));
+
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd"));
+
Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM/dd/yyyy"));
+
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd"));
+
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd-HH"));
+
Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM-dd-yyyy"));
+ }
+
+ @AfterClass
+ private void cleanup()
+ throws IOException {
+ if (localFs != null && localFs.exists(testRootDir)) {
+ localFs.delete(testRootDir, true);
+ }
+ }
+}