This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0700092  [GOBBLIN-1001] Implement TimePartitionGlobFinder
0700092 is described below

commit 070009231770e04aeb9a74a9d03865cb74cf4094
Author: zhchen <[email protected]>
AuthorDate: Fri Dec 20 17:20:53 2019 -0800

    [GOBBLIN-1001] Implement TimePartitionGlobFinder
    
    Closes #2846 from zxcware/comp
---
 .../verify/PinotAuditCountVerifierTest.java        |  14 +-
 .../copy/TimeAwareRecursiveCopyableDataset.java    |   4 +
 .../dataset/DefaultFileSystemGlobFinder.java       |  12 +-
 ...lobFinder.java => SimpleFileSystemDataset.java} |  57 +++--
 .../dataset/TimePartitionGlobFinder.java           | 268 +++++++++++++++++++++
 .../java/org/apache/gobblin/time/TimeIterator.java |  75 ++++++
 .../dataset/TimePartitionedGlobFinderTest.java     | 208 ++++++++++++++++
 7 files changed, 593 insertions(+), 45 deletions(-)

diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index f9c855a..2c4372f 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 
 /**
@@ -51,17 +52,8 @@ public class PinotAuditCountVerifierTest {
     final String inputSub = "hourly";
     final String outputSub = "hourly";
     TestAuditCountClient client = new TestAuditCountClient();
-    FileSystemDataset dataset = new FileSystemDataset() {
-      @Override
-      public Path datasetRoot() {
-        return new Path (input + topic + inputSub + "/2017/04/03/10");
-      }
-
-      @Override
-      public String datasetURN() {
-        return input + topic + inputSub + "/2017/04/03/10";
-      }
-    };
+    FileSystemDataset dataset = new SimpleFileSystemDataset(
+        new Path(input + topic + inputSub + "/2017/04/03/10"));
 
     State props = new State();
     props.setProp (CompactionAuditCountVerifier.PRODUCER_TIER, PRODUCER_TIER);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
index 56f772a..4802d62 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java
@@ -84,6 +84,10 @@ public class TimeAwareRecursiveCopyableDataset extends 
RecursiveCopyableDataset
     }
   }
 
+  /**
+   * TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as 
{@link LocalDateTime} will not adjust time
+   * to a given time zone
+   */
   public static class DateRangeIterator implements Iterator {
     private LocalDateTime startDate;
     private LocalDateTime endDate;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
index b3c7805..c94e8d4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
@@ -33,16 +33,6 @@ public class DefaultFileSystemGlobFinder extends 
ConfigurableGlobDatasetFinder<F
   }
 
   public FileSystemDataset datasetAtPath(final Path path) throws IOException {
-    return new FileSystemDataset() {
-      @Override
-      public Path datasetRoot() {
-        return path;
-      }
-
-      @Override
-      public String datasetURN() {
-        return path.toString();
-      }
-    };
+    return new SimpleFileSystemDataset(path);
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
similarity index 53%
copy from 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
copy to 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
index b3c7805..338596c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DefaultFileSystemGlobFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
@@ -14,35 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.gobblin.data.management.dataset;
 
-import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import java.io.IOException;
-import java.util.Properties;
+
+import org.apache.gobblin.dataset.FileSystemDataset;
+
 
 /**
- * A subclass of {@link ConfigurableGlobDatasetFinder} which find all the 
{@link FileSystemDataset}
- * that matches a given glob pattern.
+ * A basic implementation of {@link FileSystemDataset}. It can represent a 
virtual
+ * file system dataset which doesn't have a physical file/folder
  */
-public class DefaultFileSystemGlobFinder extends 
ConfigurableGlobDatasetFinder<FileSystemDataset> {
-  public DefaultFileSystemGlobFinder(FileSystem fs, Properties properties) 
throws IOException {
-    super(fs, properties);
+public class SimpleFileSystemDataset implements FileSystemDataset {
+
+  private final Path path;
+  private final boolean isVirtual;
+
+  public SimpleFileSystemDataset(Path path) {
+    this(path, false);
+  }
+
+  public SimpleFileSystemDataset(Path path, boolean isVirtual) {
+    this.path = path;
+    this.isVirtual = isVirtual;
+  }
+
+  @Override
+  public Path datasetRoot() {
+    return path;
+  }
+
+  @Override
+  public String datasetURN() {
+    return path.toString();
   }
 
-  public FileSystemDataset datasetAtPath(final Path path) throws IOException {
-    return new FileSystemDataset() {
-      @Override
-      public Path datasetRoot() {
-        return path;
-      }
-
-      @Override
-      public String datasetURN() {
-        return path.toString();
-      }
-    };
+  /**
+   * @return true if the dataset doesn't have a physical file/folder
+   */
+  public boolean getIsVirtual() {
+    return isVirtual;
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
new file mode 100644
index 0000000..8a707cc
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+import org.apache.gobblin.util.PathUtils;
+
+
+/**
+ * A {@link TimePartitionGlobFinder} finds all dataset time partitions within 
time window
+ * [current time - look back time, current time]. It derives an efficient 
dataset partition pattern based
+ * on the time window and a supported {@value #TIME_FORMAT}.
+ *
+ * <p> If {@value #ENABLE_VIRTUAL_PARTITION} is set, it will create virtual 
{@link SimpleFileSystemDataset}
+ * instances if a partition within the time window doesn't exist
+ */
+@Slf4j
+public class TimePartitionGlobFinder implements 
DatasetsFinder<FileSystemDataset> {
+  private static final String CONF_PREFIX = "timePartitionGlobFinder.";
+
+  public static final String PARTITION_PREFIX = CONF_PREFIX + 
"partitionPrefix";
+  public static final String TIME_FORMAT = CONF_PREFIX + "timeFormat";
+  public static final String ENABLE_VIRTUAL_PARTITION = CONF_PREFIX + 
"enableVirtualPartition";
+  /**
+   * Options are enumerated in {@link 
org.apache.gobblin.time.TimeIterator.Granularity}
+   */
+  public static final String GRANULARITY = CONF_PREFIX + "granularity";
+  public static final String TIME_ZONE = CONF_PREFIX + "timeZone";
+  public static final String LOOKBACK_SPEC = CONF_PREFIX + "lookbackSpec";
+
+  private static final String DEFAULT_TIME_ZONE = "America/Los_Angeles";
+
+  private static final Pattern SUPPORTED_TIME_FORMAT = 
Pattern.compile("(yyyy/MM(/.*)*)|(yyyy-MM(-.*)*)");
+
+  private final String datasetPattern;
+  private final String datasetPartitionPattern;
+  private final String partitionPrefix;
+  private final DateTimeFormatter timeFormatter;
+  private final boolean enableVirtualPartition;
+
+  private final ZonedDateTime startTime;
+  private final ZonedDateTime endTime;
+  private final TimeIterator.Granularity granularity;
+
+  private final Properties props;
+  private final FileSystem fs;
+
+  public TimePartitionGlobFinder(FileSystem fs, Properties properties) {
+    this(fs, properties,
+        ZonedDateTime.now(ZoneId.of(properties.getProperty(TIME_ZONE, 
DEFAULT_TIME_ZONE))));
+  }
+
+  @VisibleForTesting
+  TimePartitionGlobFinder(FileSystem fs, Properties properties, ZonedDateTime 
curTime) {
+    datasetPattern = 
properties.getProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY);
+    Path datasetPath = new Path(datasetPattern);
+
+    partitionPrefix = properties.getProperty(PARTITION_PREFIX, "");
+    String timeFormat = properties.getProperty(TIME_FORMAT).trim();
+    Preconditions.checkState(isTimeFormatSupported(timeFormat),
+        String.format("Unsupported time format %s, expecting %s", timeFormat, 
SUPPORTED_TIME_FORMAT));
+    timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
+
+    endTime = curTime;
+    Duration lookback = Duration.parse(properties.getProperty(LOOKBACK_SPEC));
+    startTime = endTime.minus(lookback);
+    granularity = 
TimeIterator.Granularity.valueOf(properties.getProperty(GRANULARITY).toUpperCase());
+
+    datasetPartitionPattern = new Path(datasetPath,
+        partitionPrefix + derivePartitionPattern(startTime, endTime, 
timeFormat)).toString();
+    log.info("Dataset partition pattern is {}", datasetPartitionPattern);
+
+    enableVirtualPartition = 
Boolean.valueOf(properties.getProperty(ENABLE_VIRTUAL_PARTITION, "false"));
+
+    props = properties;
+    this.fs = fs;
+  }
+
+  /**
+   * The finder supports time format matching {@link #SUPPORTED_TIME_FORMAT}
+   */
+  @VisibleForTesting
+  static boolean isTimeFormatSupported(String timeFormat) {
+    return SUPPORTED_TIME_FORMAT.matcher(timeFormat).matches();
+  }
+
+  /**
+   * Derive partition glob pattern from time format. It tries its best to 
provide
+   * a fine pattern by refining year and month options from reasoning
+   * start time, end time and {@link #SUPPORTED_TIME_FORMAT}
+   */
+  @VisibleForTesting
+  static String derivePartitionPattern(ZonedDateTime start,
+      ZonedDateTime end, String timeFormat) {
+    // Refine year options
+    int startYear = start.getYear();
+    int endYear = end.getYear();
+    StringBuilder yearOptions = new StringBuilder("{" + startYear);
+    appendOptions(yearOptions, startYear + 1, endYear);
+    yearOptions.append("}");
+
+    // Get month options
+    StringBuilder monthOptions = buildMonthOptions(start, end);
+
+    StringBuilder pattern = new StringBuilder(yearOptions);
+    if (timeFormat.contains("-")) {
+      pattern.append("-");
+      pattern.append(monthOptions);
+      //
+      if (!monthOptions.toString().equals("*")) {
+        pattern.append("*");
+      }
+    } else {
+      pattern.append("/");
+      pattern.append(monthOptions);
+      String[] parts = timeFormat.split("/");
+      // We already processed year and month components
+      for (int i = 2; i < parts.length; i++) {
+        pattern.append("/*");
+      }
+    }
+
+    return pattern.toString();
+  }
+
+  /**
+   * Refine month options
+   */
+  private static StringBuilder buildMonthOptions(ZonedDateTime start,
+      ZonedDateTime end) {
+    int startMonth = start.getMonthValue();
+    int endMonth = end.getMonthValue();
+    int yearDiff = end.getYear() - start.getYear();
+    if ( yearDiff > 1 || (yearDiff == 1 && endMonth >= startMonth)) {
+      // All 12 months
+      return new StringBuilder("*");
+    }
+    StringBuilder monthOptions = new StringBuilder("{" + startMonth);
+    if (endMonth >= startMonth) {
+      appendOptions(monthOptions, startMonth + 1, endMonth);
+    } else {
+      // from [startMonth + 1, 12] of start year
+      appendOptions(monthOptions, startMonth + 1, 12);
+      // from [1, endMonth] of current year
+      appendOptions(monthOptions, 1, endMonth);
+    }
+    monthOptions.append("}");
+    return monthOptions;
+  }
+
+  private static void appendOptions(StringBuilder stringBuilder, int start, 
int end) {
+    for (int i = start; i <= end; i++) {
+      stringBuilder.append(",");
+      if (i < 10) {
+        stringBuilder.append("0");
+      }
+      stringBuilder.append(i);
+    }
+  }
+
+  @Override
+  public List<FileSystemDataset> findDatasets()
+      throws IOException {
+    try {
+      return doFindDatasets();
+    } finally {
+      // Recover ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY 
config
+      
this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
 datasetPattern);
+    }
+  }
+
+  private List<FileSystemDataset> doFindDatasets() throws IOException {
+    // Find datasets
+    List<FileSystemDataset> datasets = findDatasets(datasetPattern);
+
+    // Compute partitions in theory based on startTime and endTime
+    Set<String> computedPartitions = new HashSet<>();
+    datasets.forEach(dataset -> 
computedPartitions.addAll(computePartitions(dataset)));
+
+    // This is the final result
+    List<FileSystemDataset> resultPartitions = new 
ArrayList<>(computedPartitions.size());
+
+    // Find all physical dataset time partitions
+    List<FileSystemDataset> actualPartitions = 
findDatasets(datasetPartitionPattern);
+
+    String pathStr;
+    for (FileSystemDataset physicalPartition : actualPartitions) {
+      pathStr = physicalPartition.datasetRoot().toString();
+      if (computedPartitions.contains(pathStr)) {
+        resultPartitions.add(physicalPartition);
+        computedPartitions.remove(pathStr);
+      }
+    }
+
+    // Create virtual ones;
+    if (enableVirtualPartition) {
+      computedPartitions.forEach(partition -> {
+        log.info("Creating virtual partition {}", partition);
+        resultPartitions.add(new SimpleFileSystemDataset(new Path(partition), 
true));
+      });
+    } else {
+      log.info("Will not create virtual partitions");
+    }
+
+    return resultPartitions;
+  }
+
+  private Collection<String> computePartitions(FileSystemDataset dataset) {
+    List<String> partitions = new ArrayList<>();
+    TimeIterator iterator = new TimeIterator(startTime, endTime, granularity);
+    while (iterator.hasNext()) {
+      partitions.add(new Path(dataset.datasetRoot(),
+          partitionPrefix + timeFormatter.format(iterator.next())).toString());
+    }
+    return partitions;
+  }
+
+  private List<FileSystemDataset> findDatasets(String pattern)
+      throws IOException {
+    
this.props.setProperty(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
 pattern);
+    DefaultFileSystemGlobFinder datasetFinder = new 
DefaultFileSystemGlobFinder(this.fs, this.props);
+    return datasetFinder.findDatasets();
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return PathUtils.deepestNonGlobPath(new Path(this.datasetPattern));
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
new file mode 100644
index 0000000..4630a7a
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.time;
+
+import java.time.ZonedDateTime;
+import java.util.Iterator;
+
+
+/**
+ * A {@link TimeIterator} iterates over time points within [{@code startTime}, 
{@code endTime}]. It
+ * supports time points in various granularities (See {@link Granularity}
+ */
+public class TimeIterator implements Iterator {
+
+  public enum Granularity {
+    MINUTE, HOUR, DAY, MONTH
+  }
+
+  private ZonedDateTime startTime;
+  private ZonedDateTime endTime;
+  private Granularity granularity;
+
+  public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, 
Granularity granularity) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.granularity = granularity;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !startTime.isAfter(endTime);
+  }
+
+  @Override
+  public ZonedDateTime next() {
+    ZonedDateTime dateTime = startTime;
+
+    switch (granularity) {
+      case MINUTE:
+        startTime = startTime.plusMinutes(1);
+        break;
+      case HOUR:
+        startTime = startTime.plusHours(1);
+        break;
+      case DAY:
+        startTime = startTime.plusDays(1);
+        break;
+      case MONTH:
+        startTime = startTime.plusMonths(1);
+        break;
+    }
+
+    return dateTime;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
new file mode 100644
index 0000000..fca8bb3
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.dataset.FileSystemDataset;
+
+
+public class TimePartitionedGlobFinderTest {
+
+  private Path testRootDir;
+  private FileSystem localFs;
+  private ZoneId zone;
+  private ZonedDateTime curTime;
+
+  @BeforeClass
+  private void setup()
+      throws IOException {
+    localFs = FileSystem.getLocal(new Configuration());
+    testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+        getClass().getSimpleName());
+    if (localFs.exists(testRootDir)) {
+      localFs.delete(testRootDir, true);
+    }
+    localFs.mkdirs(testRootDir);
+    localFs.deleteOnExit(testRootDir);
+    zone = ZoneId.of("America/Los_Angeles");
+    LocalDateTime localTime = LocalDateTime.of(2019,1,1,0,0);
+    curTime = ZonedDateTime.of(localTime, zone);
+  }
+
+  @Test
+  public void testDayPartitions()
+      throws IOException {
+    String hourlyFormat = "yyyy/MM/dd/HH";
+    String hourlyPrefix = "hourly/";
+    String dayFormat = "yyyy/MM/dd";
+
+    // create an empty dataset /db1/table1/hourly
+    Path ds1 = createDatasetPath("db1/table1");
+    // create dataset /db2/table2/hourly
+    Path ds2 = createDatasetPath("db2/table2");
+    createPartitions(ds1, hourlyPrefix,0, 2, hourlyFormat);
+    createPartitions(ds2, hourlyPrefix,-1, 2, hourlyFormat);
+
+    String datasetPattern = new Path(testRootDir, "*/*").toString();
+
+    // Test glob finder without creating empty partition
+    Properties props = new Properties();
+    props.setProperty("gobblin.dataset.pattern", datasetPattern);
+    props.setProperty("timePartitionGlobFinder.partitionPrefix", hourlyPrefix);
+    props.setProperty("timePartitionGlobFinder.timeFormat", dayFormat);
+    props.setProperty("timePartitionGlobFinder.lookbackSpec", "P2D");
+    props.setProperty("timePartitionGlobFinder.granularity", "DAY");
+    TimePartitionGlobFinder finder = new TimePartitionGlobFinder(localFs, 
props, curTime);
+    List<FileSystemDataset> datasets = finder.findDatasets();
+    Assert.assertEquals(datasets.size(), 2);
+    // Verify there are 2 day partitions for /db2/table2
+    Assert.assertNotNull(find(getPartitionPath(ds1, hourlyPrefix, 0, 
dayFormat), datasets));
+    Assert.assertNotNull(find(getPartitionPath(ds2, hourlyPrefix, -1, 
dayFormat), datasets));
+
+    // Test glob finder with creating empty partition
+    props.setProperty("timePartitionGlobFinder.enableVirtualPartition", 
"true");
+    finder = new TimePartitionGlobFinder(localFs, props, curTime);
+    datasets = finder.findDatasets();
+    Assert.assertEquals(datasets.size(), 6);
+    // Verify virtual partitions for /db1/table1
+    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), 
datasets).getIsVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), 
datasets).getIsVirtual());
+    // Verify virtual partitions for /db2/table2
+    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), 
datasets).getIsVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), 
datasets).getIsVirtual());
+  }
+
+  private Path getPartitionPath(Path dataset, String prefix, int dayOffset, 
String format) {
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
+    return new Path(dataset, prefix + 
formatter.format(curTime.plusDays(dayOffset)));
+  }
+
+  private SimpleFileSystemDataset find(Path path, List<FileSystemDataset> 
list) {
+    for (FileSystemDataset dataset : list) {
+      if (dataset.datasetRoot().equals(path)) {
+        return (SimpleFileSystemDataset)dataset;
+      }
+    }
+    return null;
+  }
+
+  private Path createDatasetPath(String dataset)
+      throws IOException {
+    Path datasetPath = new Path(testRootDir, dataset);
+    localFs.mkdirs(datasetPath);
+    return datasetPath;
+  }
+
+  private void createPartitions(Path dataset, String prefix, int dayOffset, 
int hours, String format)
+      throws IOException {
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
+    ZonedDateTime dayTime = curTime.plusDays(dayOffset);
+    for (int i = 0; i < hours; i++) {
+      Path hourPath = new Path(formatter.format(dayTime.withHour(i)));
+      Path datasetPartitionPath = new Path(dataset, prefix + hourPath);
+      Path dataFile = new Path(datasetPartitionPath, "dataFile");
+      try (OutputStream outputStream = localFs.create(dataFile, true)) {
+        outputStream.write(i);
+      }
+    }
+  }
+
+  @Test
+  public void testDerivePartitionPattern() {
+    String slashTimeFormat = "yyyy/MM/dd";
+    String dashTimeFormat = "yyyy-MM-dd";
+
+    // 2019/12/1 - 2019/12/3
+    LocalDateTime localTime = LocalDateTime.of(2019,12,3,0,0);
+    ZonedDateTime end = ZonedDateTime.of(localTime, zone);
+    ZonedDateTime start = end.withDayOfMonth(1);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2019}/{12}/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2019}-{12}*");
+
+    // 2019/11/30 - 2019/12/3
+    start = end.withMonth(11).withDayOfMonth(30);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2019}/{11,12}/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2019}-{11,12}*");
+
+    // 2018/12/1 - 2019/12/3
+    start = end.withYear(2018).withMonth(12).withDayOfMonth(1);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2018,2019}/*/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2018,2019}-*");
+
+    // 2018/11/30 - 2019/12/3
+    start = end.withYear(2018).withMonth(11).withDayOfMonth(30);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2018,2019}/*/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2018,2019}-*");
+
+    // 2018/11/30 - 2019/01/3
+    end = end.withMonth(1);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2018,2019}/{11,12,01}/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2018,2019}-{11,12,01}*");
+
+    // Test hourly
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, "yyyy/MM/dd/HH"),
+        "{2018,2019}/{11,12,01}/*/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, "yyyy-MM-dd-HH"),
+        "{2018,2019}-{11,12,01}*");
+  }
+
+  @Test
+  public void testSupportedTimeFormat() {
+    
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd/HH"));
+    
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy/MM/dd"));
+    
Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM/dd/yyyy"));
+    
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd"));
+    
Assert.assertTrue(TimePartitionGlobFinder.isTimeFormatSupported("yyyy-MM-dd-HH"));
+    
Assert.assertFalse(TimePartitionGlobFinder.isTimeFormatSupported("MM-dd-yyyy"));
+  }
+
+  @AfterClass
+  private void cleanup()
+      throws IOException {
+    if (localFs != null && localFs.exists(testRootDir)) {
+      localFs.delete(testRootDir, true);
+    }
+  }
+}

Reply via email to