Repository: incubator-gobblin Updated Branches: refs/heads/master e67799948 -> 70cbe91b9
[GOBBLIN-200] Add cleanable state store dataset Closes #2097 from jack-moseley/state_store_cleaner Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/70cbe91b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/70cbe91b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/70cbe91b Branch: refs/heads/master Commit: 70cbe91b9ff2e3d83dae8f36c4abb4a2cbbc425b Parents: e677999 Author: Jack Moseley <[email protected]> Authored: Fri Sep 29 10:39:06 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Fri Sep 29 10:39:06 2017 -0700 ---------------------------------------------------------------------- .../data/management/dataset/DummyDataset.java | 3 +- .../retention/dataset/CleanableDataset.java | 6 +- .../dataset/CleanableDatasetStoreDataset.java | 59 +++++++++++++++++ .../retention/dataset/CleanableHiveDataset.java | 3 +- .../dataset/TimeBasedDatasetStoreDataset.java | 57 ++++++++++++++++ .../TimeBasedDatasetStoreDatasetFinder.java | 49 ++++++++++++++ .../version/DatasetStateStoreVersion.java | 30 +++++++++ .../TimestampedDatasetStateStoreVersion.java | 52 +++++++++++++++ ...mestampedDatasetStateStoreVersionFinder.java | 51 +++++++++++++++ .../CleanableDatasetStoreDatasetTest.java | 69 ++++++++++++++++++++ .../main/resources/state-store-retention.pull | 25 +++++++ .../metadata/StateStoreEntryManager.java | 2 +- .../CleanableHivePartitionDataset.java | 3 +- 13 files changed, 401 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java index 3d7cc7d..0c0ab35 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DummyDataset.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import java.io.IOException; import java.util.Collection; +import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,7 +38,7 @@ import org.apache.gobblin.data.management.retention.dataset.CleanableDataset; * Dummy {@link Dataset} that does nothing. */ @RequiredArgsConstructor -public class DummyDataset implements CopyableDataset, CleanableDataset { +public class DummyDataset implements CopyableDataset, CleanableDataset, FileSystemDataset { private final Path datasetRoot; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java index 033d0aa..b2e55c4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDataset.java @@ -17,17 +17,15 @@ package org.apache.gobblin.data.management.retention.dataset; -import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.dataset.FileSystemDataset; - import java.io.IOException; +import org.apache.gobblin.dataset.Dataset; /** * An abstraction for a set of files where a simple {@link org.apache.gobblin.data.management.retention.policy.RetentionPolicy} * can be applied. */ -public interface CleanableDataset extends Dataset, FileSystemDataset { +public interface CleanableDataset extends Dataset { /** * Cleans the {@link CleanableDataset}. In general, this means to apply a http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java new file mode 100644 index 0000000..9f29c9a --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableDatasetStoreDataset.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention.dataset; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.gobblin.data.management.policy.VersionSelectionPolicy; +import org.apache.gobblin.data.management.version.DatasetStateStoreVersion; +import org.apache.gobblin.data.management.version.DatasetVersion; +import org.apache.gobblin.data.management.version.finder.VersionFinder; +import org.apache.gobblin.metastore.DatasetStoreDataset; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import com.google.common.collect.Lists; + + +/** + * A cleanable {@link DatasetStoreDataset} + */ +public abstract class CleanableDatasetStoreDataset<T extends DatasetVersion> extends DatasetStoreDataset implements CleanableDataset { + + public CleanableDatasetStoreDataset(DatasetStoreDataset.Key key, List<DatasetStateStoreEntryManager> entries) { + super(key, entries); + } + + public abstract VersionFinder<? extends T> getVersionFinder(); + + public abstract VersionSelectionPolicy<T> getVersionSelectionPolicy(); + + @Override + public void clean() throws IOException { + + List<T> versions = Lists.newArrayList(this.getVersionFinder().findDatasetVersions(this)); + + Collections.sort(versions, Collections.reverseOrder()); + + Collection<T> deletableVersions = this.getVersionSelectionPolicy().listSelectedVersions(versions); + + for (Object version : deletableVersions) { + ((DatasetStateStoreVersion) version).getEntry().delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java index 282a2a6..97cec4c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableHiveDataset.java @@ -28,6 +28,7 @@ import java.util.Set; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -66,7 +67,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @Slf4j @SuppressWarnings({ "rawtypes", "unchecked" }) @Getter -public class CleanableHiveDataset extends HiveDataset implements CleanableDataset { +public class CleanableHiveDataset extends HiveDataset implements CleanableDataset, FileSystemDataset { private static final String SHOULD_DELETE_DATA_KEY = "gobblin.retention.hive.shouldDeleteData"; private static final String SHOULD_DELETE_DATA_DEFAULT = Boolean.toString(false); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java new file mode 100644 index 0000000..0fa7c18 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention.dataset; + +import java.util.List; +import java.util.Properties; +import org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy; +import org.apache.gobblin.data.management.policy.VersionSelectionPolicy; +import org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion; +import org.apache.gobblin.data.management.version.TimestampedDatasetVersion; +import org.apache.gobblin.data.management.version.finder.TimestampedDatasetStateStoreVersionFinder; +import org.apache.gobblin.data.management.version.finder.VersionFinder; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.util.ConfigUtils; +import lombok.Data; + + +/** + * A {@link CleanableDatasetStoreDataset} that deletes entries before a certain time + */ +@Data +public class TimeBasedDatasetStoreDataset extends CleanableDatasetStoreDataset<TimestampedDatasetVersion> { + + private final VersionFinder<TimestampedDatasetStateStoreVersion> versionFinder; + private final VersionSelectionPolicy<TimestampedDatasetVersion> versionSelectionPolicy; + + public TimeBasedDatasetStoreDataset(Key key, List<DatasetStateStoreEntryManager> entries, Properties props) { + super(key, entries); + this.versionFinder = new TimestampedDatasetStateStoreVersionFinder(); + this.versionSelectionPolicy = new SelectBeforeTimeBasedPolicy(ConfigUtils.propertiesToConfig(props)); + } + + @Override + public VersionFinder<TimestampedDatasetStateStoreVersion> getVersionFinder() { + return this.versionFinder; + } + + @Override + public VersionSelectionPolicy<TimestampedDatasetVersion> getVersionSelectionPolicy() { + return this.versionSelectionPolicy; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java new file mode 100644 index 0000000..eeb92aa --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/finder/TimeBasedDatasetStoreDatasetFinder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention.dataset.finder; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +import org.apache.gobblin.data.management.retention.dataset.CleanableDatasetStoreDataset; +import org.apache.gobblin.data.management.retention.dataset.TimeBasedDatasetStoreDataset; +import org.apache.gobblin.metastore.DatasetStoreDataset; +import org.apache.gobblin.metastore.DatasetStoreDatasetFinder; +import org.apache.hadoop.fs.FileSystem; + + +/** + * A {@link DatasetStoreDatasetFinder} that returns {@link CleanableDatasetStoreDataset} + */ +public class TimeBasedDatasetStoreDatasetFinder extends DatasetStoreDatasetFinder { + + private Properties props; + + public TimeBasedDatasetStoreDatasetFinder(FileSystem fs, Properties props) throws IOException { + super(fs, props); + this.props = props; + } + + @Override + public List<DatasetStoreDataset> findDatasets() throws IOException { + return super.findDatasets().stream() + .map(dataset -> new TimeBasedDatasetStoreDataset(dataset.getKey(), dataset.getDatasetStateStoreMetadataEntries(), props)) + .collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java new file mode 100644 index 0000000..47c161d --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/DatasetStateStoreVersion.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.version; + +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; + + +/** + * {@link DatasetVersion} that has a {@link DatasetStateStoreEntryManager} + */ +public interface DatasetStateStoreVersion extends DatasetVersion { + + DatasetStateStoreEntryManager getEntry(); + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java new file mode 100644 index 0000000..45a153e --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.version; + +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.joda.time.DateTime; +import lombok.Getter; + +/** + * {@link TimestampedDatasetVersion} that has a {@link DatasetStateStoreEntryManager} + */ +@Getter +public class TimestampedDatasetStateStoreVersion extends TimestampedDatasetVersion implements DatasetStateStoreVersion { + + private final DatasetStateStoreEntryManager entry; + + public TimestampedDatasetStateStoreVersion(DatasetStateStoreEntryManager entry) { + super(new DateTime(entry.getTimestamp()), null); + this.entry = entry; + } + + @Override + public int compareTo(FileSystemDatasetVersion other) { + TimestampedDatasetVersion otherAsDateTime = (TimestampedDatasetVersion) other; + return this.version.equals(otherAsDateTime.version) ? 0 : this.version.compareTo(otherAsDateTime.version); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return this.version.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java new file mode 100644 index 0000000..3f9b0b5 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/TimestampedDatasetStateStoreVersionFinder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.version.finder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.gobblin.data.management.version.FileSystemDatasetVersion; +import org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.metastore.DatasetStoreDataset; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import com.google.common.collect.Lists; + + +/** + * {@link VersionFinder} for {@link TimestampedDatasetStateStoreVersion} + */ +public class TimestampedDatasetStateStoreVersionFinder implements VersionFinder<TimestampedDatasetStateStoreVersion> { + + @Override + public Class<? extends FileSystemDatasetVersion> versionClass() { + return TimestampedDatasetStateStoreVersion.class; + } + + @Override + public Collection<TimestampedDatasetStateStoreVersion> findDatasetVersions(Dataset dataset) throws IOException { + DatasetStoreDataset storeDataset = ((DatasetStoreDataset) dataset); + List<TimestampedDatasetStateStoreVersion> versions = Lists.newArrayList(); + + for (DatasetStateStoreEntryManager entry : storeDataset.getDatasetStateStoreMetadataEntries()) { + versions.add(new TimestampedDatasetStateStoreVersion(entry)); + } + return versions; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java new file mode 100644 index 0000000..43dbd8a --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetStoreDatasetTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.retention.dataset.CleanableDataset; +import org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder; +import org.apache.gobblin.metastore.DatasetStoreDataset; +import org.apache.gobblin.runtime.FsDatasetStateStore; +import org.apache.gobblin.runtime.JobState; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.testng.Assert; +import org.testng.annotations.Test; +import com.google.common.io.Files; + + +/** + * Unit test for {@link org.apache.gobblin.data.management.retention.dataset.CleanableDatasetStoreDataset} + */ +public class CleanableDatasetStoreDatasetTest { + @Test + public void testCleanStateStore() throws IOException { + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + FileSystem fs = FileSystem.getLocal(new Configuration()); + + FsDatasetStateStore store = new FsDatasetStateStore(fs, tmpDir.getAbsolutePath()); + + store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id1")); + store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id2")); + store.persistDatasetState("dataset1", new JobState.DatasetState("job2", "job2_id1")); + store.persistDatasetState("", new JobState.DatasetState("job3", "job3_id1")); + + Properties props = new Properties(); + + props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.getAbsolutePath()); + props.setProperty("selection.timeBased.lookbackTime", "0m"); + + TimeBasedDatasetStoreDatasetFinder datasetFinder = new TimeBasedDatasetStoreDatasetFinder(fs, props); + List<DatasetStoreDataset> datasets = datasetFinder.findDatasets(); + + for (DatasetStoreDataset dataset : datasets) { + ((CleanableDataset) dataset).clean(); + File jobDir = new File(tmpDir.getAbsolutePath(), dataset.getKey().getStoreName()); + Assert.assertEquals(jobDir.list().length, 1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-example/src/main/resources/state-store-retention.pull ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/state-store-retention.pull b/gobblin-example/src/main/resources/state-store-retention.pull new file mode 100644 index 0000000..f8920cc --- /dev/null +++ b/gobblin-example/src/main/resources/state-store-retention.pull @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +type=hadoopJava +job.class=org.apache.gobblin.data.management.retention.DatasetCleanerJob + +gobblin.retention.dataset.finder.class=org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder +selection.timeBased.lookbackTime=30m + +state.store.fs.uri=hdfs://localhost:9000 +state.store.dir=example/state-store \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java index b2fb04c..c4c7796 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java @@ -41,7 +41,7 @@ public abstract class StateStoreEntryManager<T extends State> { /** {@link StateStore} where this entry exists. */ private final StateStore stateStore; - private final long getTimestamp() { + public final long getTimestamp() { if (this.timestamp <= 0) { throw new RuntimeException("Timestamp is not reliable."); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/70cbe91b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java index c8898f3..67ed73a 100644 --- a/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java +++ b/gobblin-modules/gobblin-compliance/src/main/java/org/apache/gobblin/compliance/retention/CleanableHivePartitionDataset.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -48,7 +49,7 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * @author adsharma */ @Slf4j -public class CleanableHivePartitionDataset extends HivePartitionDataset implements CleanableDataset { +public class CleanableHivePartitionDataset extends HivePartitionDataset implements CleanableDataset, FileSystemDataset { private FileSystem fs; private State state;
