This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 726f43fba Minor fixes to manifest distcp to ensure that owner and
permission iterator is distinct, and fix setPermissionCommitStep which had an
empty map (#4000)
726f43fba is described below
commit 726f43fba3b9c2e9d3ec39660393f1089deacdbb
Author: William Lo <[email protected]>
AuthorDate: Tue Jul 16 21:49:20 2024 -0400
Minor fixes to manifest distcp to ensure that owner and permission iterator
is distinct, and fix setPermissionCommitStep which had an empty map (#4000)
---
.../data/management/copy/ManifestBasedDataset.java | 28 ++++++++-------
.../writer/FileAwareInputStreamDataWriter.java | 2 +-
.../CreateDirectoryWithPermissionsCommitStep.java | 2 +-
.../dataset/ManifestBasedDatasetFinderTest.java | 40 +++++++++++++++-------
4 files changed, 45 insertions(+), 27 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 6f2cccd7c..060d4acef 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -17,13 +17,6 @@
package org.apache.gobblin.data.management.copy;
-import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonSyntaxException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -33,21 +26,31 @@ import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonSyntaxException;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.CreateDirectoryWithPermissionsCommitStep;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.gobblin.util.filesystem.OwnerAndPermission;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
/**
* A dataset that based on Manifest. We expect the Manifest contains the list
of all the files for this dataset.
@@ -139,6 +142,7 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
// Avoid duplicate calculation for the same ancestor
if (fromPath != null &&
!ancestorOwnerAndPermissions.containsKey(PathUtils.getPathWithoutSchemeAndAuthority(fromPath).toString())
&& !targetFs.exists(fromPath)) {
ancestorOwnerAndPermissions.put(fromPath.toString(),
copyableFile.getAncestorsOwnerAndPermission());
+
flattenedAncestorPermissions.putAll(CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs,
fromPath, new Path(commonFilesParent), configuration));
}
if (existOnTarget && srcFile.isFile()) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 03ccf99a3..4413ce503 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -473,7 +473,7 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
Iterator<OwnerAndPermission> ancestorOwnerAndPermissionIt =
copyableFile.getAncestorsOwnerAndPermission() == null ?
Collections.emptyIterator()
- : copyableFile.getAncestorsOwnerAndPermission().iterator();
+ : copyableFile.getAncestorsOwnerAndPermission().listIterator();
HadoopUtils.ensureDirectoryExists(this.fs, outputFilePath.getParent(),
ancestorOwnerAndPermissionIt, false);
if (copyableFile.getFileStatus().isDirectory() &&
this.fs.exists(outputFilePath)) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateDirectoryWithPermissionsCommitStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateDirectoryWithPermissionsCommitStep.java
index 768d8d2d1..8540fdc8c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateDirectoryWithPermissionsCommitStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/util/commit/CreateDirectoryWithPermissionsCommitStep.java
@@ -75,7 +75,7 @@ public class CreateDirectoryWithPermissionsCommitStep
implements CommitStep {
try {
// Is a no-op if directory already exists, stops when it hits first
parent
// Sets the execute bit for USER in order to rename files to the
folder, so it should be reset after this step is completed
- HadoopUtils.ensureDirectoryExists(fs, path,
entry.getValue().iterator(), throwOnError);
+ HadoopUtils.ensureDirectoryExists(fs, path,
entry.getValue().listIterator(), throwOnError);
} catch (IOException e) {
log.warn("Error while creating directory or setting owners/permission
on " + path, e);
if (this.throwOnError) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
index 8b5d84471..a5b87ef37 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -23,17 +23,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.CopyEntity;
-import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
-import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
-import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
-import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
-import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.util.commit.CreateDirectoryWithPermissionsCommitStep;
-import org.apache.gobblin.util.commit.SetPermissionCommitStep;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -46,7 +37,20 @@ import org.testng.annotations.Test;
import com.google.common.io.Files;
-import static org.mockito.Mockito.*;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
+import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.util.commit.CreateDirectoryWithPermissionsCommitStep;
+import org.apache.gobblin.util.commit.SetPermissionCommitStep;
+import org.apache.gobblin.util.filesystem.OwnerAndPermission;
+
+import static org.mockito.Mockito.any;
public class ManifestBasedDatasetFinderTest {
@@ -141,8 +145,18 @@ public class ManifestBasedDatasetFinderTest {
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 4); // 2 files to copy +
1 pre publish step + 1 post publish step
- Assert.assertTrue(((PrePublishStep)fileSet.getFiles().get(2)).getStep()
instanceof CreateDirectoryWithPermissionsCommitStep);
- Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(3)).getStep()
instanceof SetPermissionCommitStep);
+ CommitStep createDirectoryStep = ((PrePublishStep)
fileSet.getFiles().get(2)).getStep();
+ Assert.assertTrue(createDirectoryStep instanceof
CreateDirectoryWithPermissionsCommitStep);
+ Map<String, List<OwnerAndPermission>> pathAndPermissions =
((CreateDirectoryWithPermissionsCommitStep)
createDirectoryStep).getPathAndPermissions();
+ Assert.assertEquals(pathAndPermissions.size(), 1);
+ Assert.assertTrue(pathAndPermissions.containsKey("/tmp/dataset"));
+
+ CommitStep setPermissionStep = ((PostPublishStep)
fileSet.getFiles().get(3)).getStep();
+ Assert.assertTrue(setPermissionStep instanceof SetPermissionCommitStep);
+ Map<String, OwnerAndPermission> ownerAndPermissionMap =
((SetPermissionCommitStep) setPermissionStep).getPathAndPermissions();
+ Assert.assertEquals(ownerAndPermissionMap.size(), 2);
+ Assert.assertTrue(ownerAndPermissionMap.containsKey("/tmp/dataset"));
+ Assert.assertTrue(ownerAndPermissionMap.containsKey("/tmp"));
}
}