This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c50b527ca Add new lookback version finder for use with iceberg 
retention (#3670)
c50b527ca is described below

commit c50b527caeba98785e552966016f15dbc614b4da
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Apr 11 15:47:13 2023 -0700

    Add new lookback version finder for use with iceberg retention (#3670)
---
 .../data/management/retention/DatasetCleaner.java  |   2 +-
 .../LookbackDateTimeDatasetVersionFinder.java      |  91 +++++++++++++++++++
 .../LookbackDateTimeDatasetVersionFinderTest.java  | 101 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 30da80fe2..13841ce45 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -176,7 +176,7 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
   @Override
   public void close() throws IOException {
     try {
-      if (this.finishCleanSignal.isPresent()) {
+      if (this.finishCleanSignal != null && 
this.finishCleanSignal.isPresent()) {
         this.finishCleanSignal.get().await();
       }
       if (!this.throwables.isEmpty()) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
new file mode 100644
index 000000000..dd7f4f700
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.version.finder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * {@link DatasetVersionFinder} that constructs {@link 
TimestampedDatasetVersion}s without actually checking for existence
+ * of the version path. The version path is constructed by appending the 
version partition pattern to the dataset root.
+ * The versions are found by looking back a specific period of time and 
finding unique date partitions between that
+ * time and the current time. Lookback is supported to hourly granularity.
+ */
+public class LookbackDateTimeDatasetVersionFinder extends 
DateTimeDatasetVersionFinder {
+  public static final String VERSION_PATH_PREFIX = "version.path.prefix";
+  public static final String VERSION_LOOKBACK_PERIOD = 
"version.lookback.period";
+
+  private final Duration stepDuration;
+  private final Period lookbackPeriod;
+  private final String pathPrefix;
+  private final Instant endTime;
+
+  public LookbackDateTimeDatasetVersionFinder(FileSystem fs, Config config) {
+    this(fs, config, Instant.now());
+  }
+
+  @VisibleForTesting
+  public LookbackDateTimeDatasetVersionFinder(FileSystem fs, Config config, 
Instant endTime) {
+    super(fs, config);
+    Preconditions.checkArgument(config.hasPath(VERSION_LOOKBACK_PERIOD) , 
"Missing required property " + VERSION_LOOKBACK_PERIOD);
+    PeriodFormatter periodFormatter =
+        new 
PeriodFormatterBuilder().appendYears().appendSuffix("y").appendMonths().appendSuffix("M").appendDays()
+            .appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+    this.stepDuration = Duration.standardHours(1);
+    this.pathPrefix = ConfigUtils.getString(config, VERSION_PATH_PREFIX, "");
+    this.lookbackPeriod = 
periodFormatter.parsePeriod(config.getString(VERSION_LOOKBACK_PERIOD));
+    this.endTime = endTime;
+  }
+
+  @Override
+  public Collection<TimestampedDatasetVersion> findDatasetVersions(Dataset 
dataset) throws IOException {
+    FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+    Set<TimestampedDatasetVersion> versions = new HashSet<>();
+    Instant startTime = endTime.minus(lookbackPeriod.toStandardDuration());
+
+    for (Instant time = startTime; !time.isAfter(endTime); time = 
time.plus(stepDuration)) {
+      String truncatedTime = formatter.print(time);
+      DateTime versionTime = formatter.parseDateTime(truncatedTime);
+      Path versionPath = new Path(fsDataset.datasetRoot(), new 
Path(pathPrefix, truncatedTime));
+      versions.add(new TimestampedDatasetVersion(versionTime, versionPath));
+    }
+
+    return versions;
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
new file mode 100644
index 000000000..6040ed325
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Test(groups = { "gobblin.data.management.version" })
+public class LookbackDateTimeDatasetVersionFinderTest {
+
+  private FileSystem fs;
+  private DateTimeFormatter formatter = 
DateTimeFormat.forPattern("yyyy/MM/dd/HH").withZone(DateTimeZone.forID(ConfigurationKeys.PST_TIMEZONE_NAME));
+  private final Instant fixedTime = 
Instant.parse("2023-01-01T12:30:00.000-08:00");
+
+  @Test
+  public void testHourlyVersions() throws Exception {
+    Properties properties = new Properties();
+    properties.put(DateTimeDatasetVersionFinder.DATE_TIME_PATTERN_KEY, 
"yyyy/MM/dd/HH");
+    properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_PATH_PREFIX, 
"hourly");
+    
properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_LOOKBACK_PERIOD, 
"96h");
+
+    LookbackDateTimeDatasetVersionFinder versionFinder = new 
LookbackDateTimeDatasetVersionFinder(FileSystem.getLocal(new Configuration()),
+        ConfigUtils.propertiesToConfig(properties), fixedTime);
+    Dataset dataset = new TestDataset(new Path("/data/Dataset1"));
+    Collection<TimestampedDatasetVersion> datasetVersions = 
versionFinder.findDatasetVersions(dataset);
+    List<TimestampedDatasetVersion> sortedVersions = 
datasetVersions.stream().sorted().collect(Collectors.toList());
+    Assert.assertEquals(datasetVersions.size(), 97);
+    
Assert.assertEquals(sortedVersions.get(0).getVersion().toString(formatter), 
"2022/12/28/12");
+    Assert.assertEquals(sortedVersions.get(0).getPath().toString(), 
"/data/Dataset1/hourly/2022/12/28/12");
+    Assert.assertEquals(sortedVersions.get(sortedVersions.size() - 
1).getVersion().toString(formatter), "2023/01/01/12");
+    Assert.assertEquals(sortedVersions.get(sortedVersions.size() - 
1).getPath().toString(), "/data/Dataset1/hourly/2023/01/01/12");
+  }
+
+  @Test
+  public void testDailyVersions() throws Exception {
+    Properties properties = new Properties();
+    properties.put(DateTimeDatasetVersionFinder.DATE_TIME_PATTERN_KEY, 
"yyyy/MM/dd");
+    properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_PATH_PREFIX, 
"daily");
+    
properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_LOOKBACK_PERIOD, 
"366d");
+
+    LookbackDateTimeDatasetVersionFinder versionFinder = new 
LookbackDateTimeDatasetVersionFinder(FileSystem.getLocal(new Configuration()),
+        ConfigUtils.propertiesToConfig(properties), fixedTime);
+    Dataset dataset = new TestDataset(new Path("/data/Dataset1"));
+    Collection<TimestampedDatasetVersion> datasetVersions = 
versionFinder.findDatasetVersions(dataset);
+    List<TimestampedDatasetVersion> sortedVersions = 
datasetVersions.stream().sorted().collect(Collectors.toList());
+    Assert.assertEquals(datasetVersions.size(), 367);
+    
Assert.assertEquals(sortedVersions.get(0).getVersion().toString(formatter), 
"2021/12/31/00");
+    Assert.assertEquals(sortedVersions.get(0).getPath().toString(), 
"/data/Dataset1/daily/2021/12/31");
+    Assert.assertEquals(sortedVersions.get(sortedVersions.size() - 
1).getVersion().toString(formatter), "2023/01/01/00");
+    Assert.assertEquals(sortedVersions.get(sortedVersions.size() - 
1).getPath().toString(), "/data/Dataset1/daily/2023/01/01");
+  }
+}
+
+class TestDataset implements FileSystemDataset {
+  private final Path datasetRoot;
+
+  public TestDataset(Path datasetRoot) {
+    this.datasetRoot = datasetRoot;
+  }
+
+  public Path datasetRoot() {
+    return datasetRoot;
+  }
+
+  public String datasetURN() {
+    return null;
+  }
+}

Reply via email to