This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c50b527ca Add new lookback version finder for use with iceberg
retention (#3670)
c50b527ca is described below
commit c50b527caeba98785e552966016f15dbc614b4da
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Apr 11 15:47:13 2023 -0700
Add new lookback version finder for use with iceberg retention (#3670)
---
.../data/management/retention/DatasetCleaner.java | 2 +-
.../LookbackDateTimeDatasetVersionFinder.java | 91 +++++++++++++++++++
.../LookbackDateTimeDatasetVersionFinderTest.java | 101 +++++++++++++++++++++
3 files changed, 193 insertions(+), 1 deletion(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 30da80fe2..13841ce45 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -176,7 +176,7 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
@Override
public void close() throws IOException {
try {
- if (this.finishCleanSignal.isPresent()) {
+ if (this.finishCleanSignal != null &&
this.finishCleanSignal.isPresent()) {
this.finishCleanSignal.get().await();
}
if (!this.throwables.isEmpty()) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
new file mode 100644
index 000000000..dd7f4f700
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.version.finder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * {@link DatasetVersionFinder} that constructs {@link
TimestampedDatasetVersion}s without actually checking for existence
+ * of the version path. The version path is constructed by appending the
version partition pattern to the dataset root.
+ * The versions are found by looking back a specific period of time and
finding unique date partitions between that
+ * time and the current time. Lookback is supported to hourly granularity.
+ */
+public class LookbackDateTimeDatasetVersionFinder extends
DateTimeDatasetVersionFinder {
+ public static final String VERSION_PATH_PREFIX = "version.path.prefix";
+ public static final String VERSION_LOOKBACK_PERIOD =
"version.lookback.period";
+
+ private final Duration stepDuration;
+ private final Period lookbackPeriod;
+ private final String pathPrefix;
+ private final Instant endTime;
+
+ public LookbackDateTimeDatasetVersionFinder(FileSystem fs, Config config) {
+ this(fs, config, Instant.now());
+ }
+
+ @VisibleForTesting
+ public LookbackDateTimeDatasetVersionFinder(FileSystem fs, Config config,
Instant endTime) {
+ super(fs, config);
+ Preconditions.checkArgument(config.hasPath(VERSION_LOOKBACK_PERIOD) ,
"Missing required property " + VERSION_LOOKBACK_PERIOD);
+ PeriodFormatter periodFormatter =
+ new
PeriodFormatterBuilder().appendYears().appendSuffix("y").appendMonths().appendSuffix("M").appendDays()
+ .appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ this.stepDuration = Duration.standardHours(1);
+ this.pathPrefix = ConfigUtils.getString(config, VERSION_PATH_PREFIX, "");
+ this.lookbackPeriod =
periodFormatter.parsePeriod(config.getString(VERSION_LOOKBACK_PERIOD));
+ this.endTime = endTime;
+ }
+
+ @Override
+ public Collection<TimestampedDatasetVersion> findDatasetVersions(Dataset
dataset) throws IOException {
+ FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+ Set<TimestampedDatasetVersion> versions = new HashSet<>();
+ Instant startTime = endTime.minus(lookbackPeriod.toStandardDuration());
+
+ for (Instant time = startTime; !time.isAfter(endTime); time =
time.plus(stepDuration)) {
+ String truncatedTime = formatter.print(time);
+ DateTime versionTime = formatter.parseDateTime(truncatedTime);
+ Path versionPath = new Path(fsDataset.datasetRoot(), new
Path(pathPrefix, truncatedTime));
+ versions.add(new TimestampedDatasetVersion(versionTime, versionPath));
+ }
+
+ return versions;
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
new file mode 100644
index 000000000..6040ed325
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/LookbackDateTimeDatasetVersionFinderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Test(groups = { "gobblin.data.management.version" })
+public class LookbackDateTimeDatasetVersionFinderTest {
+
+ private FileSystem fs;
+ private DateTimeFormatter formatter =
DateTimeFormat.forPattern("yyyy/MM/dd/HH").withZone(DateTimeZone.forID(ConfigurationKeys.PST_TIMEZONE_NAME));
+ private final Instant fixedTime =
Instant.parse("2023-01-01T12:30:00.000-08:00");
+
+ @Test
+ public void testHourlyVersions() throws Exception {
+ Properties properties = new Properties();
+ properties.put(DateTimeDatasetVersionFinder.DATE_TIME_PATTERN_KEY,
"yyyy/MM/dd/HH");
+ properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_PATH_PREFIX,
"hourly");
+
properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_LOOKBACK_PERIOD,
"96h");
+
+ LookbackDateTimeDatasetVersionFinder versionFinder = new
LookbackDateTimeDatasetVersionFinder(FileSystem.getLocal(new Configuration()),
+ ConfigUtils.propertiesToConfig(properties), fixedTime);
+ Dataset dataset = new TestDataset(new Path("/data/Dataset1"));
+ Collection<TimestampedDatasetVersion> datasetVersions =
versionFinder.findDatasetVersions(dataset);
+ List<TimestampedDatasetVersion> sortedVersions =
datasetVersions.stream().sorted().collect(Collectors.toList());
+ Assert.assertEquals(datasetVersions.size(), 97);
+
Assert.assertEquals(sortedVersions.get(0).getVersion().toString(formatter),
"2022/12/28/12");
+ Assert.assertEquals(sortedVersions.get(0).getPath().toString(),
"/data/Dataset1/hourly/2022/12/28/12");
+ Assert.assertEquals(sortedVersions.get(sortedVersions.size() -
1).getVersion().toString(formatter), "2023/01/01/12");
+ Assert.assertEquals(sortedVersions.get(sortedVersions.size() -
1).getPath().toString(), "/data/Dataset1/hourly/2023/01/01/12");
+ }
+
+ @Test
+ public void testDailyVersions() throws Exception {
+ Properties properties = new Properties();
+ properties.put(DateTimeDatasetVersionFinder.DATE_TIME_PATTERN_KEY,
"yyyy/MM/dd");
+ properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_PATH_PREFIX,
"daily");
+
properties.put(LookbackDateTimeDatasetVersionFinder.VERSION_LOOKBACK_PERIOD,
"366d");
+
+ LookbackDateTimeDatasetVersionFinder versionFinder = new
LookbackDateTimeDatasetVersionFinder(FileSystem.getLocal(new Configuration()),
+ ConfigUtils.propertiesToConfig(properties), fixedTime);
+ Dataset dataset = new TestDataset(new Path("/data/Dataset1"));
+ Collection<TimestampedDatasetVersion> datasetVersions =
versionFinder.findDatasetVersions(dataset);
+ List<TimestampedDatasetVersion> sortedVersions =
datasetVersions.stream().sorted().collect(Collectors.toList());
+ Assert.assertEquals(datasetVersions.size(), 367);
+
Assert.assertEquals(sortedVersions.get(0).getVersion().toString(formatter),
"2021/12/31/00");
+ Assert.assertEquals(sortedVersions.get(0).getPath().toString(),
"/data/Dataset1/daily/2021/12/31");
+ Assert.assertEquals(sortedVersions.get(sortedVersions.size() -
1).getVersion().toString(formatter), "2023/01/01/00");
+ Assert.assertEquals(sortedVersions.get(sortedVersions.size() -
1).getPath().toString(), "/data/Dataset1/daily/2023/01/01");
+ }
+}
+
+class TestDataset implements FileSystemDataset {
+ private final Path datasetRoot;
+
+ public TestDataset(Path datasetRoot) {
+ this.datasetRoot = datasetRoot;
+ }
+
+ public Path datasetRoot() {
+ return datasetRoot;
+ }
+
+ public String datasetURN() {
+ return null;
+ }
+}