This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 389c15851 [GOBBLIN-1991] set (target) dataset path correctly in
RecursiveCopyableDataset (#3867)
389c15851 is described below
commit 389c15851248fecfb13b737ea1ddde31800896d5
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jan 25 15:03:09 2024 -0800
[GOBBLIN-1991] set (target) dataset path correctly in
RecursiveCopyableDataset (#3867)
* set (target) dataset path correctly in RecursiveCopyableDataset
---
.../data/management/copy/CopyConfiguration.java | 27 +++++++++++++---------
.../management/copy/RecursiveCopyableDataset.java | 23 ++++++++++--------
.../copy/RecursiveCopyableDatasetTest.java | 25 ++++++++++++++++----
3 files changed, 50 insertions(+), 25 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 62271088c..4ea556e4d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -19,12 +19,6 @@ package org.apache.gobblin.data.management.copy;
import java.util.Properties;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-
-import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,11 +26,17 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.ResourcePool;
@@ -120,11 +120,8 @@ public class CopyConfiguration {
properties.containsKey(DESTINATION_GROUP_KEY) ?
Optional.of(properties.getProperty(DESTINATION_GROUP_KEY))
: Optional.<String>absent();
this.preserve =
PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY));
- Path publishDirTmp = new
Path(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR));
- if (!publishDirTmp.isAbsolute()) {
- publishDirTmp = new Path(targetFs.getWorkingDirectory(),
publishDirTmp);
- }
- this.publishDir = publishDirTmp;
+
+ this.publishDir = calculatePublishDir(targetFs, properties);
this.copyContext = new CopyContext();
this.targetFs = targetFs;
if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
@@ -150,6 +147,14 @@ public class CopyConfiguration {
}
}
+ static Path calculatePublishDir(FileSystem targetFs, Properties properties) {
+ Path publishDirTmp = new
Path(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR));
+ if (!publishDirTmp.isAbsolute()) {
+ publishDirTmp = new Path(targetFs.getWorkingDirectory(), publishDirTmp);
+ }
+ return publishDirTmp;
+ }
+
public static CopyConfigurationBuilder builder(FileSystem targetFs,
Properties properties) {
return new CopyConfigurationBuilder(targetFs, properties);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 6b181011a..b47604a5d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -66,10 +66,11 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
public static final String USE_NEW_PRESERVE_LOGIC_KEY = CONFIG_PREFIX +
".useNewPreserveLogic";
private final Path rootPath;
+ private Path targetPath;
private final FileSystem fs;
private final PathFilter pathFilter;
- // Glob used to find this dataset
- private final Path glob;
+ // deepest non glob search path used to find this dataset
+ private final Path nonGlobSearchPath;
private final CopyableFileFilter copyableFileFilter;
private final boolean update;
private final boolean delete;
@@ -89,11 +90,8 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
this.fs = fs;
-
this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
this.copyableFileFilter =
DatasetUtils.instantiateCopyableFileFilter(properties);
- this.glob = glob;
-
this.update = Boolean.parseBoolean(properties.getProperty(UPDATE_KEY));
this.delete = Boolean.parseBoolean(properties.getProperty(DELETE_KEY));
this.deleteEmptyDirectories =
Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY));
@@ -103,6 +101,7 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES,
"false"));
this.useNewPreserveLogic =
Boolean.parseBoolean(properties.getProperty(USE_NEW_PRESERVE_LOGIC_KEY));
this.properties = properties;
+ this.nonGlobSearchPath = PathUtils.deepestNonGlobPath(glob);
}
protected Collection<? extends CopyEntity>
getCopyableFilesImpl(CopyConfiguration configuration,
@@ -190,9 +189,8 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
public Collection<? extends CopyEntity> getCopyableFiles(FileSystem
targetFs, CopyConfiguration configuration)
throws IOException {
- Path nonGlobSearchPath = PathUtils.deepestNonGlobPath(this.glob);
- Path targetPath =
- new Path(configuration.getPublishDir(),
PathUtils.relativizePath(this.rootPath, nonGlobSearchPath));
+ this.targetPath =
+ new Path(configuration.getPublishDir(),
PathUtils.relativizePath(this.rootPath, this.nonGlobSearchPath));
Map<Path, FileStatus> filesInSource =
createPathMap(getFilesAtPath(this.fs, this.rootPath, this.pathFilter),
this.rootPath);
@@ -239,7 +237,14 @@ public class RecursiveCopyableDataset implements
CopyableDataset, FileSystemData
}
@Override
+ // this is more like a getTargetDatasetPath
+ // this should preferably be called after the call to getCopyableFiles()
because targetPath is set in getCopyableFiles
public String getDatasetPath() {
- return Path.getPathWithoutSchemeAndAuthority(this.rootPath).toString();
+ if (this.targetPath == null) {
+ // if getCopyableFiles() is not called that sets targetPath, it will try
to
+ this.targetPath = new
Path(CopyConfiguration.calculatePublishDir(this.fs, this.properties),
+ PathUtils.relativizePath(this.rootPath, this.nonGlobSearchPath));;
+ }
+ return Path.getPathWithoutSchemeAndAuthority(this.targetPath).toString();
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
index d624f8250..ed87a7c21 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
@@ -33,23 +33,38 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import javax.annotation.Nullable;
+import lombok.Data;
+
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import javax.annotation.Nullable;
-import lombok.Data;
-
public class RecursiveCopyableDatasetTest {
+ @Test
+ public void testTargetDatasetPath() throws Exception {
+ Path source = new Path("/source");
+ Path target = new Path("/target");
+
+ List<FileStatus> sourceFiles = Lists.newArrayList(createFileStatus(source,
"file1"), createFileStatus(source, "file2"));
+ List<FileStatus> targetFiles = Lists.newArrayList(createFileStatus(target,
"file3"));
+
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
target.toString());
+ RecursiveCopyableDataset dataset = new
TestRecursiveCopyableDataset(source, target, sourceFiles, targetFiles,
properties);
+
+ Assert.assertEquals(dataset.getDatasetPath(), target.toString());
+ }
+
@Test
public void testSimpleCopy() throws Exception {
Path source = new Path("/source");