This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 80bcfb7 set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that
any retention job can use this information (#3422)
80bcfb7 is described below
commit 80bcfb752a2c8dc15465f70a2ea142482147d3da
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Nov 2 17:25:55 2021 -0700
set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention
job can use this information (#3422)
---
.../ManagedIcebergCleanableDatasetFinder.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
index a2684e4..f45b798 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ManagedIcebergCleanableDatasetFinder.java
@@ -17,10 +17,16 @@
package org.apache.gobblin.data.management.retention.profile;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import
org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -28,12 +34,10 @@ import
org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import
org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
import
org.apache.gobblin.data.management.retention.dataset.ConfigurableCleanableDataset;
+import org.apache.gobblin.data.management.retention.dataset.FsCleanableHelper;
import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
-import
org.apache.gobblin.data.management.retention.dataset.CleanableIcebergDataset;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.LoggerFactory;
public class ManagedIcebergCleanableDatasetFinder extends
ManagedCleanableDatasetFinder {
@@ -51,9 +55,13 @@ public class ManagedIcebergCleanableDatasetFinder extends
ManagedCleanableDatase
@Override
public ConfigurableCleanableDataset<FileSystemDatasetVersion>
datasetAtPath(Path path) throws IOException {
+ Properties datasetProps = new Properties();
+ datasetProps.putAll(this.props);
+ datasetProps.setProperty(FsCleanableHelper.RETENTION_DATASET_ROOT,
path.toString());
+
try {
- return new CleanableIcebergDataset<>(this.fs, this.props, path,
-
this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI)
+ ICEBERG_CONFIG_PREFIX + path.toString()),
+ return new CleanableIcebergDataset<>(this.fs, datasetProps, path,
+
this.client.getConfig(this.props.getProperty(ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI)
+ ICEBERG_CONFIG_PREFIX + path),
LoggerFactory.getLogger(CleanableIcebergDataset.class));
} catch (ConfigStoreFactoryDoesNotExistsException |
ConfigStoreCreationException | URISyntaxException |
VersionDoesNotExistException var3) {
throw new IllegalArgumentException(var3);