This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1114284
[GOBBLIN-910][Gobblin-910][GOBBLIN-825][GOBBLIN-908][GOBBLIN-911][GOBBLIN-912][GOBBLIN-914][GOBBLIN-915]
Added a unix timestamp recursive copyable dataset finder
1114284 is described below
commit 11142844bfb60e276e54f1242aa75153cd531ef3
Author: vbohra <[email protected]>
AuthorDate: Fri Oct 18 10:43:08 2019 -0700
[GOBBLIN-910][Gobblin-910][GOBBLIN-825][GOBBLIN-908][GOBBLIN-911][GOBBLIN-912][GOBBLIN-914][GOBBLIN-915]
Added a unix timestamp recursive copyable dataset finder
Closes #2765 from vikrambohra/master
---
.../copy/UnixTimestampCopyableDatasetFinder.java | 41 +++++
.../UnixTimestampRecursiveCopyableDataset.java | 165 ++++++++++++++++++
.../UnixTimestampRecursiveCopyableDatasetTest.java | 190 +++++++++++++++++++++
3 files changed, 396 insertions(+)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
new file mode 100644
index 0000000..baa4f9c
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampCopyableDatasetFinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.Properties;
+import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * {@link
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder}
that returns datasets of type
+ * {@link
org.apache.gobblin.data.management.copy.TimeAwareRecursiveCopyableDataset}.N
+ */
+public class UnixTimestampCopyableDatasetFinder extends
ConfigurableGlobDatasetFinder<CopyableDataset> {
+
+ public UnixTimestampCopyableDatasetFinder(FileSystem fs, Properties props) {
+ super(fs, props);
+ }
+
+ @Override
+ public CopyableDataset datasetAtPath(Path path) throws IOException {
+ return new UnixTimestampRecursiveCopyableDataset(this.fs, path,
this.props, this.datasetPattern);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
new file mode 100644
index 0000000..81613ab
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalDateTime;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.testng.collections.Lists;
+
+import javafx.util.Pair;
+
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.AndPathFilter;
+
+
+/**
+ * This dataset filters file paths based on a {@link #timestampPattern} and
{@link #versionSelectionPolicy}
+ *
+ * The default regex will match the first occurrence of a directory matching
the pattern after the dataset root
+ *
+ */
+public class UnixTimestampRecursiveCopyableDataset extends
RecursiveCopyableDataset {
+
+ private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX +
".recursive";
+ public static final String VERSION_SELECTION_POLICY = CONFIG_PREFIX +
".version.selection.policy";
+ public static final String TIMESTAMP_REGEEX = CONFIG_PREFIX +
".timestamp.pattern";
+ public static final String DEFAULT_TIMESTAMP_REGEX = ".*/([0-9]{13}).*/.*";
+ private final String lookbackTime;
+ private final Period lookbackPeriod;
+ private final LocalDateTime currentTime;
+ private final VersionSelectionPolicy versionSelectionPolicy;
+ private final DateTimeZone dateTimeZone;
+ private final Pattern timestampPattern;
+
+ public UnixTimestampRecursiveCopyableDataset(FileSystem fs, Path rootPath,
Properties properties, Path glob) {
+ super(fs, rootPath, properties, glob);
+ this.lookbackTime =
properties.getProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY);
+ this.versionSelectionPolicy =
+
VersionSelectionPolicy.valueOf(properties.getProperty(VERSION_SELECTION_POLICY).toUpperCase());
+ PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
+ this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
+ String timestampRegex = properties.getProperty(TIMESTAMP_REGEEX,
DEFAULT_TIMESTAMP_REGEX);
+ this.timestampPattern = Pattern.compile(timestampRegex);
+ this.dateTimeZone = DateTimeZone.forID(properties
+
.getProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_TIMEZONE_KEY,
+ TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+ this.currentTime = LocalDateTime.now(this.dateTimeZone);
+ }
+
+ private enum VersionSelectionPolicy {
+ EARLIEST, LATEST, ALL
+ }
+
+ /**
+ * Given a lookback period, this filter extracts the timestamp from the path
+ * based on {@link #timestampPattern} and filters out the paths that are out
the date range
+ *
+ */
+ class TimestampPathFilter implements PathFilter {
+
+ @Override
+ public boolean accept(Path path) {
+
+ LocalDate endDate = currentTime.toLocalDate();
+ LocalDate startDate = endDate.minus(lookbackPeriod);
+ Path relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path),
datasetRoot());
+ Matcher matcher = timestampPattern.matcher(relativePath.toString());
+ if (!matcher.matches()) {
+ return false;
+ }
+ Long timestamp = Long.parseLong(matcher.group(1));
+ LocalDate dateOfTimestamp = new LocalDateTime(timestamp,
dateTimeZone).toLocalDate();
+ return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) ||
dateOfTimestamp.isEqual(startDate)
+ || dateOfTimestamp.isBefore(startDate));
+ }
+ }
+
+ @Override
+ protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter)
+ throws IOException {
+
+ // Filter files by lookback period (fileNames >= startDate and fileNames
<= endDate)
+ PathFilter andPathFilter = new AndPathFilter(fileFilter, new
TimestampPathFilter());
+ List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
+
+ if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
+ return files;
+ }
+
+ Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>>
pathTimestampFilesMap = new HashMap<>();
+ // Now select files per day based on version selection policy
+ for (FileStatus fileStatus : files) {
+ String relativePath =
PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()),
datasetRoot()).toString();
+ Matcher matcher = timestampPattern.matcher(relativePath);
+ if (!matcher.matches()) {
+ continue;
+ }
+ String timestampStr = matcher.group(1);
+ String rootPath = relativePath.substring(0,
relativePath.indexOf(timestampStr));
+ Long unixTimestamp = Long.parseLong(timestampStr);
+ LocalDate localDate = new
LocalDateTime(unixTimestamp,dateTimeZone).toLocalDate();
+ Pair<String, LocalDate> key = new Pair<>(rootPath, localDate);
+ if (!pathTimestampFilesMap.containsKey(key)) {
+ pathTimestampFilesMap.put(key, new TreeMap<Long, List<FileStatus>>());
+ }
+ Map<Long, List<FileStatus>> timestampFilesMap =
pathTimestampFilesMap.get(key);
+
+ if (!timestampFilesMap.containsKey(unixTimestamp)) {
+ timestampFilesMap.put(unixTimestamp, Lists.newArrayList());
+ }
+ List<FileStatus> filesStatuses = timestampFilesMap.get(unixTimestamp);
+ filesStatuses.add(fileStatus);
+
+ }
+
+ List<FileStatus> result = new ArrayList<>();
+ for(TreeMap<Long, List<FileStatus>> timestampFileStatus :
pathTimestampFilesMap.values()) {
+ if(timestampFileStatus.size() <=0 ) {
+ continue;
+ }
+ switch (versionSelectionPolicy) {
+ case EARLIEST:
+ result.addAll(timestampFileStatus.firstEntry().getValue());
+ break;
+ case LATEST:
+ result.addAll(timestampFileStatus.lastEntry().getValue());
+ break;
+ default:
+ throw new RuntimeException("Unsupported version selection policy");
+ }
+ }
+ return result;
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
new file mode 100644
index 0000000..046eb1d
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDatasetTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.copy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDateTime;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UnixTimestampRecursiveCopyableDatasetTest {
+
+ String rootPath = "/tmp/src";
+ String databaseName = "dbName";
+ String tableName = "tableName";
+ String sourceDir = rootPath + "/" + databaseName + "/" + tableName;
+ private Path baseSrcDir;
+ private FileSystem fs;
+ private Path baseDstDir;
+
+ private static final String NUM_LOOKBACK_DAYS_STR = "2d";
+ private static final Integer MAX_NUM_DAILY_DIRS = 4;
+ private static final Integer NUM_DIRS_PER_DAY = 5;
+ private static final Integer NUM_FILES_PER_DIR = 3;
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+
+ this.fs = FileSystem.getLocal(new Configuration());
+
+ baseSrcDir = new Path(sourceDir);
+ if (fs.exists(baseSrcDir)) {
+ fs.delete(baseSrcDir, true);
+ }
+ fs.mkdirs(baseSrcDir);
+
+ baseDstDir = new Path("/tmp/dst/dataset1/");
+ if (fs.exists(baseDstDir)) {
+ fs.delete(baseDstDir, true);
+ }
+ fs.mkdirs(baseDstDir);
+ }
+
+ @Test
+ public void testGetFilesAtPath()
+ throws IOException {
+ //1570544993735-PT-499913495
+
+ LocalDateTime endDate =
+
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+ for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+ for (int j = 0; j < NUM_DIRS_PER_DAY; j++) {
+ Path subDirPath =
+ new Path(baseSrcDir, new
Path(endDate.toDateTime().plusSeconds(60).getMillis() + "-PT-100000"));
+ fs.mkdirs(subDirPath);
+ for (int k = 0; k < NUM_FILES_PER_DIR; k++) {
+ Path filePath = new Path(subDirPath, k + ".avro");
+ fs.create(filePath);
+ }
+ endDate = endDate.minusMinutes(10);
+ }
+ endDate = endDate.minusDays(1);
+ }
+
+ PathFilter ACCEPT_ALL_PATH_FILTER = new PathFilter() {
+
+ @Override
+ public boolean accept(Path path) {
+ return true;
+ }
+ };
+
+ //
+ // Test db level copy, Qualifying Regex: ".*([0-9]{13})-PT-.*/.*", dataset
root = /tmp/src/databaseName
+ //
+ Properties properties = new Properties();
+ properties.setProperty("gobblin.dataset.pattern", sourceDir);
+
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_TIMEZONE_KEY,
+ TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE);
+
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY,
NUM_LOOKBACK_DAYS_STR);
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"ALL");
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.TIMESTAMP_REGEEX,
".*/([0-9]{13})-PT-.*/.*");
+
+ UnixTimestampCopyableDatasetFinder finder = new
UnixTimestampCopyableDatasetFinder(fs, properties);
+
+ // Snap shot selection policy = ALL
+ String datasetRoot = rootPath + "/" + databaseName;
+ UnixTimestampRecursiveCopyableDataset dataset =
(UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 30);
+
+ // version selection policy = EARLIEST
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"EARLIEST");
+ finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+ dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 6);
+
+ // version selection policy = LATEST
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"latest");
+ finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+ dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 6);
+
+
+ //
+ // Test table level copy, Qualifying Regex: ".*/([0-9]{13})-PT-.*/.*")\,
dataset root = /tmp/src/databaseName/tableName
+ //
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.TIMESTAMP_REGEEX,
"([0-9]{13})-PT-.*/.*");
+ finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+ datasetRoot = rootPath + "/" + databaseName + "/" + tableName;
+
+ // Snap shot selection policy = ALL
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"ALL");
+ dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 30);
+
+ // Snap shot selection policy = EARLIEST
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"EARLIEST");
+ finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+ dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 6);
+
+ // Snap shot selection policy = LATEST
+
properties.setProperty(UnixTimestampRecursiveCopyableDataset.VERSION_SELECTION_POLICY,
"latest");
+ finder = new UnixTimestampCopyableDatasetFinder(fs, properties);
+ dataset = (UnixTimestampRecursiveCopyableDataset) finder.datasetAtPath(new
Path(datasetRoot));
+ fileStatusList = dataset.getFilesAtPath(fs, baseSrcDir,
ACCEPT_ALL_PATH_FILTER);
+ Assert.assertTrue(fileStatusList.size() == 6);
+
+ }
+
+ @Test
+ public void testRegex() {
+ String dbRegex = ".*/([0-9]{13}).*/.*";
+ long now = System.currentTimeMillis();
+ String path = "tableName/" + now + "-PT-12345/part1.avro";
+ Pattern pattern = Pattern.compile(dbRegex);
+ Matcher matcher = pattern.matcher(path);
+ Assert.assertTrue(matcher.matches());
+ Assert.assertEquals(Long.parseLong(matcher.group(1)), now);
+
+ String tableRegex = "([0-9]{13}).*/.*";
+ path = now + "-PT-12345/part1.avro";
+ pattern = Pattern.compile(tableRegex);
+ matcher = pattern.matcher(path);
+ Assert.assertTrue(matcher.matches());
+ Assert.assertEquals(Long.parseLong(matcher.group(1)), now);
+ }
+
+ @AfterClass
+ public void clean()
+ throws IOException {
+ //Delete tmp directories
+ this.fs.delete(baseSrcDir, true);
+ this.fs.delete(baseDstDir, true);
+ }
+}