This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1114284  
[GOBBLIN-910][Gobblin-910][GOBBLIN-825][GOBBLIN-908][GOBBLIN-911][GOBBLIN-912][GOBBLIN-914][GOBBLIN-915]
 Added a unix timestamp recursive copyable dataset finder
1114284 is described below

commit 11142844bfb60e276e54f1242aa75153cd531ef3
Author: vbohra <[email protected]>
AuthorDate: Fri Oct 18 10:43:08 2019 -0700

    
[GOBBLIN-910][Gobblin-910][GOBBLIN-825][GOBBLIN-908][GOBBLIN-911][GOBBLIN-912][GOBBLIN-914][GOBBLIN-915]
 Added a unix timestamp recursive copyable dataset finder
    
    Closes #2765 from vikrambohra/master
---
 .../copy/UnixTimestampCopyableDatasetFinder.java   |  41 +++++
 .../UnixTimestampRecursiveCopyableDataset.java     | 165 ++++++++++++++++++
 .../UnixTimestampRecursiveCopyableDatasetTest.java | 190 +++++++++++++++++++++
 3 files changed, 396 insertions(+)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
new file mode 100644
index 0000000..baa4f9c
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.Properties;
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * {@link 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder}
 that returns datasets of type
+ * {@link 
org.apache.gobblin.data.management.copy.TimeAwareRecursiveCopyableDataset}.N
+ */
+public class UnixTimestampCopyableDatasetFinder extends 
ConfigurableGlobDatasetFinder<CopyableDataset> {
+
+  public UnixTimestampCopyableDatasetFinder(FileSystem fs, Properties props) {
+    super(fs, props);
+  }
+
+  @Override
+  public CopyableDataset datasetAtPath(Path path) throws IOException {
+    return new UnixTimestampRecursiveCopyableDataset(this.fs, path, 
this.props, this.datasetPattern);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
new file mode 100644
index 0000000..81613ab
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.testng.collections.Lists;
+
+import javafx.util.Pair;
+
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.AndPathFilter;
+
+
+/**
+ * This dataset filters file paths based on a {@link #timestampPattern} and 
{@link #versionSelectionPolicy}
+ *
+ * The default regex will match the first occurrence of a directory matching 
the pattern after the dataset root
+ *
+ */
+public class UnixTimestampRecursiveCopyableDataset extends 
RecursiveCopyableDataset {
+
+  private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + 
".recursive";
+  public static final String VERSION_SELECTION_POLICY = CONFIG_PREFIX + 
".version.selection.policy";
+  public static final String TIMESTAMP_REGEEX = CONFIG_PREFIX + 
".timestamp.pattern";
+  public static final String DEFAULT_TIMESTAMP_REGEX = ".*/([0-9]{13}).*/.*";
+  private final String lookbackTime;
+  private final Period lookbackPeriod;
+  private final LocalDateTime currentTime;
+  private final VersionSelectionPolicy versionSelectionPolicy;
+  private final DateTimeZone dateTimeZone;
+  private final Pattern timestampPattern;
+
+  public UnixTimestampRecursiveCopyableDataset(FileSystem fs, Path rootPath, 
Properties properties, Path glob) {
+    super(fs, rootPath, properties, glob);
+    this.lookbackTime = 
properties.getProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY);
+    this.versionSelectionPolicy =
+        
VersionSelectionPolicy.valueOf(properties.getProperty(VERSION_SELECTION_POLICY).toUpperCase());
+    PeriodFormatter periodFormatter = new 
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
+    this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
+    String timestampRegex = properties.getProperty(TIMESTAMP_REGEEX, 
DEFAULT_TIMESTAMP_REGEX);
+    this.timestampPattern = Pattern.compile(timestampRegex);
+    this.dateTimeZone = DateTimeZone.forID(properties
+        
.getProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_TIMEZONE_KEY,
+            TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+    this.currentTime = LocalDateTime.now(this.dateTimeZone);
+  }
+
+  private enum VersionSelectionPolicy {
+    EARLIEST, LATEST, ALL
+  }
+
+  /**
+   * Given a lookback period, this filter extracts the timestamp from the path
+   * based on {@link #timestampPattern} and filters out the paths that are out 
the date range
+   *
+   */
+  class TimestampPathFilter implements PathFilter {
+
+    @Override
+    public boolean accept(Path path) {
+
+      LocalDate endDate = currentTime.toLocalDate();
+      LocalDate startDate = endDate.minus(lookbackPeriod);
+      Path relativePath = 
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), 
datasetRoot());
+      Matcher matcher = timestampPattern.matcher(relativePath.toString());
+      if (!matcher.matches()) {
+        return false;
+      }
+      Long timestamp = Long.parseLong(matcher.group(1));
+      LocalDate dateOfTimestamp = new LocalDateTime(timestamp, 
dateTimeZone).toLocalDate();
+      return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) || 
dateOfTimestamp.isEqual(startDate)
+          || dateOfTimestamp.isBefore(startDate));
+    }
+  }
+
+  @Override
+  protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, 
PathFilter fileFilter)
+      throws IOException {
+
+    // Filter files by lookback period (fileNames >= startDate and fileNames 
<= endDate)
+    PathFilter andPathFilter = new AndPathFilter(fileFilter, new 
TimestampPathFilter());
+    List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
+
+    if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
+      return files;
+    }
+
+    Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>> 
pathTimestampFilesMap = new HashMap<>();
+    // Now select files per day based on version selection policy
+    for (FileStatus fileStatus : files) {
+      String relativePath = 
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()),
 datasetRoot()).toString();
+      Matcher matcher = timestampPattern.matcher(relativePath);
+      if (!matcher.matches()) {
+        continue;
+      }
+      String timestampStr = matcher.group(1);
+      String rootPath = relativePath.substring(0, 
relativePath.indexOf(timestampStr));
+      Long unixTimestamp = Long.parseLong(timestampStr);
+      LocalDate localDate = new 
LocalDateTime(unixTimestamp,dateTimeZone).toLocalDate();
+      Pair<String, LocalDate> key = new Pair<>(rootPath, localDate);
+      if (!pathTimestampFilesMap.containsKey(key)) {
+        pathTimestampFilesMap.put(key, new TreeMap<Long, List<FileStatus>>());
+      }
+      Map<Long, List<FileStatus>> timestampFilesMap = 
pathTimestampFilesMap.get(key);
+
+      if (!timestampFilesMap.containsKey(unixTimestamp)) {
+        timestampFilesMap.put(unixTimestamp, Lists.newArrayList());
+      }
+      List<FileStatus> filesStatuses = timestampFilesMap.get(unixTimestamp);
+      filesStatuses.add(fileStatus);
+
+    }
+
+    List<FileStatus> result = new ArrayList<>();
+    for(TreeMap<Long, List<FileStatus>> timestampFileStatus : 
pathTimestampFilesMap.values()) {
+      if(timestampFileStatus.size() <=0 ) {
+        continue;
+      }
+      switch (versionSelectionPolicy) {
+        case EARLIEST:
+          result.addAll(timestampFileStatus.firstEntry().getValue());
+          break;
+        case LATEST:
+          result.addAll(timestampFileStatus.lastEntry().getValue());
+          break;
+        default:
+          throw new RuntimeException("Unsupported version selection policy");
+      }
+    }
+    return result;
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
new file mode 100644
index 0000000..046eb1d
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDateTime;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UnixTimestampRecursiveCopyableDatasetTest {
+
+  String rootPath = "/tmp/src";
+  String databaseName = "dbName";
+  String tableName = "tableName";
+  String sourceDir = rootPath + "/" + databaseName + "/" + tableName;
+  private Path baseSrcDir;
+  private FileSystem fs;
+  private Path baseDstDir;
+
+  private static final String NUM_LOOKBACK_DAYS_STR = "2d";
+  private static final Integer MAX_NUM_DAILY_DIRS = 4;
+  private static final Integer NUM_DIRS_PER_DAY = 5;
+  private static final Integer NUM_FILES_PER_DIR = 3;
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+
+    this.fs = FileSystem.getLocal(new Configuration());
+
+    baseSrcDir = new Path(sourceDir);
+    if (fs.exists(baseSrcDir)) {
+      fs.delete(baseSrcDir, true);
+    }
+    fs.mkdirs(baseSrcDir);
+
+    baseDstDir = new Path("/tmp/dst/dataset1/");
+    if (fs.exists(baseDstDir)) {
+      fs.delete(baseDstDir, true);
+    }
+    fs.mkdirs(baseDstDir);
+  }
+
+  @Test
+  public void testGetFilesAtPath()
+      throws IOException {
+    //1570544993735-PT-499913495
+
+    LocalDateTime endDate =
+        
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+      for (int j = 0; j < NUM_DIRS_PER_DAY; j++) {
+        Path subDirPath =
+            new Path(baseSrcDir, new 
Path(endDate.toDateTime().plusSeconds(60).getMillis() + "-PT-100000"));
+        fs.mkdirs(subDirPath);
+        for (int k = 0; k < NUM_FILES_PER_DIR; k++) {
+          Path filePath = new Path(subDirPath, k + ".avro");
+          fs.create(filePath);
+        }
+        endDate = endDate.minusMinutes(10);
+      }
+      endDate = endDate.minusDays(1);
+    }
+
+    PathFilter ACCEPT_ALL_PATH_FILTER = new PathFilter() {
+
+      @Override
+      public boolean accept(Path path) {
+        return true;
+      }
+    };
+
+    //
+    // Test db level copy, Qualifying Regex: ".*([0-9]{13})-PT-.*/.*", dataset 
root = /tmp/src/databaseName
+    //
+    Properties properties = new Properties();
+    properties.setProperty("gobblin.dataset.pattern", sourceDir);
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_TIMEZONE_KEY,
+        TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE);
+    
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, 
NUM_LOOKBACK_DAYS_STR);
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "ALL");
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.TIMESTAMP_REGEEX, 
".*/([0-9]{13})-PT-.*/.*");
+
+    UnixTimestampCopyableDatasetFinder finder = new 
UnixTimestampCopyableDatasetFinder(fs, properties);
+
+    // Snap shot selection policy = ALL
+    String datasetRoot = rootPath + "/" + databaseName;
+    UnixTimestampRecursiveCopyableDataset dataset = 
(UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 30);
+
+    // version selection policy = EARLIEST
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "EARLIEST");
+    finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+    dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 6);
+
+    // version selection policy = LATEST
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "latest");
+    finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+    dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 6);
+
+
+    //
+    // Test table level copy, Qualifying Regex: ".*/([0-9]{13})-PT-.*/.*")\, 
dataset root = /tmp/src/databaseName/tableName
+    //
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.TIMESTAMP_REGEEX, 
"([0-9]{13})-PT-.*/.*");
+    finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+    datasetRoot = rootPath + "/" + databaseName + "/" + tableName;
+
+    // Snap shot selection policy = ALL
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "ALL");
+    dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 30);
+
+    // Snap shot selection policy = EARLIEST
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "EARLIEST");
+    finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+    dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 6);
+
+    // Snap shot selection policy = LATEST
+    
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
 "latest");
+    finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+    dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new 
Path(datasetRoot));
+    fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir, 
ACCEPT_ALL_PATH_FILTER);
+    Assert.assertTrue(fileStatusList.size() == 6);
+
+  }
+
+  @Test
+  public void testRegex() {
+    String dbRegex = ".*/([0-9]{13}).*/.*";
+    long now = System.currentTimeMillis();
+    String path = "tableName/" + now + "-PT-12345/part1.avro";
+    Pattern pattern = Pattern.compile(dbRegex);
+    Matcher matcher = pattern.matcher(path);
+    Assert.assertTrue(matcher.matches());
+    Assert.assertEquals(Long.parseLong(matcher.group(1)), now);
+
+    String tableRegex = "([0-9]{13}).*/.*";
+    path = now + "-PT-12345/part1.avro";
+    pattern = Pattern.compile(tableRegex);
+    matcher = pattern.matcher(path);
+    Assert.assertTrue(matcher.matches());
+    Assert.assertEquals(Long.parseLong(matcher.group(1)), now);
+  }
+
+  @AfterClass
+  public void clean()
+      throws IOException {
+    //Delete tmp directories
+    this.fs.delete(baseSrcDir, true);
+    this.fs.delete(baseDstDir, true);
+  }
+}

Reply via email to