Repository: incubator-gobblin Updated Branches: refs/heads/master 8e974ef09 -> f43de8c4d
[GOBBLIN-586] Added enhancement to apply retention on remote HDFS Closes #2452 from amarnathkarthik/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f43de8c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f43de8c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f43de8c4 Branch: refs/heads/master Commit: f43de8c4d7dd0fa3521edc03a173f25873ba9814 Parents: 8e974ef Author: Karthik Amarnath <[email protected]> Authored: Mon Sep 17 21:52:41 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Sep 17 21:52:41 2018 -0700 ---------------------------------------------------------------------- .../data/management/retention/DatasetCleaner.java | 14 +++++++++----- .../data/management/retention/DatasetCleanerJob.java | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java index d0a0a96..6793f5a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.gobblin.util.AzkabanTags; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.data.management.retention.dataset.CleanableDataset; import org.apache.gobblin.data.management.retention.profile.MultiCleanableDatasetFinder; @@ -56,6 +57,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.RateControlledFileSystem; import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor; +import org.apache.gobblin.util.WriterUtils; /** @@ -86,11 +88,14 @@ public class DatasetCleaner implements Instrumentable, Closeable { public DatasetCleaner(FileSystem fs, Properties props) throws IOException { + State state = new State(props); + FileSystem targetFs = + props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? WriterUtils.getWriterFs(state) : fs; this.closer = Closer.create(); try { - FileSystem optionalRateControlledFs = fs; + FileSystem optionalRateControlledFs = targetFs; if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) { - optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(fs, + optionalRateControlledFs = this.closer.register(new RateControlledFileSystem(targetFs, Long.parseLong(props.getProperty(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)))); ((RateControlledFileSystem) optionalRateControlledFs).startRateControl(); } @@ -152,7 +157,6 @@ public class DatasetCleaner implements Instrumentable, Closeable { LOG.info("Successfully cleaned: " + dataset.datasetURN()); Instrumented.markMeter(DatasetCleaner.this.datasetsCleanSuccessMeter); } - }); } } @@ -196,8 +200,8 @@ public class DatasetCleaner implements Instrumentable, Closeable { @Override public void switchMetricContext(List<Tag<?>> tags) { - this.metricContext = this.closer - .register(Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String> absent())); + this.metricContext = this.closer.register( + Instrumented.newContextFromReferenceContext(this.metricContext, tags, Optional.<String>absent())); this.regenerateMetrics(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f43de8c4/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java index c4a18cc..fbb6fe5 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleanerJob.java @@ -54,7 +54,7 @@ public class DatasetCleanerJob extends AbstractJob implements Tool { @Override public void run() throws Exception { if (this.datasetCleaner != null) { - this.datasetCleaner.clean(); + this.datasetCleaner.clean(); } }
