This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8be7fb637 [GOBBLIN-1720]Add ancestors owner permissions preservations
for iceberg distcp (#3577)
8be7fb637 is described below
commit 8be7fb637e9390ed76b4de30377d64908e229ea5
Author: meethngala <[email protected]>
AuthorDate: Tue Nov 15 11:47:24 2022 -0800
[GOBBLIN-1720]Add ancestors owner permissions preservations for iceberg
distcp (#3577)
* rebasing from master
* Rebasing from master
* fix checkstyle
* minor javadoc update
* updated path utils and iceberg dataset unit test for Fs permissions
* addressed comments on PR
* added counter test for not preserving Fs attributes
* minor: changed set to list
* fixed checkstyle
* adding verifcation for ancestor owner and permissions of Fs for iceberg
dataset test
Co-authored-by: Meeth Gala <[email protected]>
---
.../management/copy/iceberg/IcebergDataset.java | 10 +-
.../copy/iceberg/IcebergDatasetTest.java | 317 +++++++++++++++------
.../java/org/apache/gobblin/util/PathUtils.java | 16 ++
3 files changed, 261 insertions(+), 82 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 006e9c1c6..63b23260f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -51,10 +51,12 @@ import
org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;
/**
@@ -144,13 +146,17 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
FileStatus srcFileStatus = entry.getValue();
// TODO: should be the same FS each time; try creating once, reusing
thereafter, to not recreate wastefully
FileSystem actualSourceFs =
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+ Path greatestAncestorPath = PathUtils.getRootPathChild(srcPath);
- // TODO: Add preservation of ancestor ownership and permissions!
-
+ // preserving ancestor permissions till root path's child between src
and dest
+ List<OwnerAndPermission> ancestorOwnerAndPermissionList =
+
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(actualSourceFs,
+ srcPath.getParent(), greatestAncestorPath, copyConfig);
CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
actualSourceFs, srcFileStatus, targetFs.makeQualified(srcPath),
copyConfig)
.fileSet(fileSet)
.datasetOutputPath(targetFs.getUri().getPath())
+ .ancestorsOwnerAndPermission(ancestorOwnerAndPermissionList)
.build();
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
fileEntity.setDestinationData(getDestinationDataset(targetFs));
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 159eca34c..d8683d250 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -32,30 +32,35 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import joptsimple.internal.Strings;
+import lombok.Data;
+
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyContext;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.testng.collections.Sets;
import static org.mockito.Matchers.any;
@@ -65,6 +70,7 @@ public class IcebergDatasetTest {
private static final URI SRC_FS_URI;
private static final URI DEST_FS_URI;
+
static {
try {
SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
@@ -80,20 +86,18 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_0 = ROOT_PATH +
"metadata/manifest.a";
private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
- private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new
MockIcebergTable.SnapshotPaths(
- Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
- Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
- MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A,
MANIFEST_DATA_PATH_0B)))
- );
+ private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 =
+ new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH),
MANIFEST_LIST_PATH_0, Arrays.asList(
+ new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0,
+ Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B))));
private static final String MANIFEST_LIST_PATH_1 =
MANIFEST_LIST_PATH_0.replaceAll("\\.x$", ".y");
private static final String MANIFEST_PATH_1 =
MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
private static final String MANIFEST_DATA_PATH_1A =
MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
private static final String MANIFEST_DATA_PATH_1B =
MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
- private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new
MockIcebergTable.SnapshotPaths(
- Optional.empty(), MANIFEST_LIST_PATH_1,
- Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
- MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A,
MANIFEST_DATA_PATH_1B)))
- );
+ private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 =
+ new MockIcebergTable.SnapshotPaths(Optional.empty(),
MANIFEST_LIST_PATH_1, Arrays.asList(
+ new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1,
+ Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B))));
private final String testDbName = "test_db_name";
private final String testTblName = "test_tbl_name";
@@ -150,10 +154,8 @@ public class IcebergDatasetTest {
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_0);
expectedResultPaths.remove(missingPath);
- validateGetFilePathsGivenDestState(
- icebergSnapshots,
-
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
- existingDestPaths,
+ validateGetFilePathsGivenDestState(icebergSnapshots,
+
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
existingDestPaths,
expectedResultPaths);
}
@@ -171,7 +173,8 @@ public class IcebergDatasetTest {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH,
MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any
delta
- IcebergTable mockTable =
validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ IcebergTable mockTable =
+ validateGetFilePathsGivenDestState(icebergSnapshots,
existingDestPaths, expectedResultPaths);
// ensure short-circuiting was able to avert iceberg manifests scan
Mockito.verify(mockTable,
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
Mockito.verifyNoMoreInteractions(mockTable);
@@ -219,21 +222,20 @@ public class IcebergDatasetTest {
public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException {
List<String> expectedPaths = Arrays.asList(METADATA_PATH,
MANIFEST_LIST_PATH_0,
MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
-
MockFileSystemBuilder sourceBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergDataset icebergDataset =
+ new TrickIcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
- CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs,
copyConfigProperties)
- .preserve(PreserveAttributes.fromMnemonicString(""))
- .copyContext(new CopyContext())
- .build();
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(destFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyCopyEntities(copyEntities, expectedPaths);
}
@@ -250,45 +252,102 @@ public class IcebergDatasetTest {
FileSystem sourceFs = sourceBuilder.build();
IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergDataset icebergDataset =
+ new TrickIcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
- CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs,
copyConfigProperties)
- .preserve(PreserveAttributes.fromMnemonicString(""))
- .copyContext(new CopyContext())
- .build();
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(destFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyCopyEntities(copyEntities, expectedPaths);
}
+ @Test
+ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws
IOException {
+ FileStatus metadataFileStatus = new FileStatus(0, false, 0, 0, 0, 0, new
FsPermission(FsAction.WRITE, FsAction.READ, FsAction.NONE), "metadata_owner",
"metadata_group", null);
+ FileStatus manifestFileStatus = new FileStatus(0, false, 0, 0, 0, 0, new
FsPermission(FsAction.WRITE, FsAction.READ, FsAction.NONE),
"manifest_list_owner", "manifest_list_group", null);
+ FileStatus manifestDataFileStatus = new FileStatus(0, false, 0, 0, 0, 0,
new FsPermission(FsAction.WRITE_EXECUTE, FsAction.READ_EXECUTE, FsAction.NONE),
"manifest_data_owner", "manifest_data_group", null);
+ Map<String, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap();
+ expectedPathsAndFileStatuses.put(METADATA_PATH, metadataFileStatus);
+ expectedPathsAndFileStatuses.put(MANIFEST_PATH_0, manifestFileStatus);
+ expectedPathsAndFileStatuses.put(MANIFEST_LIST_PATH_0, manifestFileStatus);
+ expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0A,
manifestDataFileStatus);
+ expectedPathsAndFileStatuses.put(MANIFEST_DATA_PATH_0B,
manifestDataFileStatus);
+
+ MockFileSystemBuilder sourceBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
+ sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
+ FileSystem sourceFs = sourceBuilder.build();
+
+ IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+
+ MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ FileSystem destFs = destBuilder.build();
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(destFs, copyConfigProperties)
+ // preserving attributes for owner, group and permissions
respectively
+ .preserve(PreserveAttributes.fromMnemonicString("ugp"))
+ .copyContext(new CopyContext()).build();
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
+ verifyFsOwnershipAndPermissionPreservation(copyEntities,
sourceBuilder.getPathsAndFileStatuses());
+ }
+
+ @Test
+ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty()
throws IOException {
+ List<String> expectedPaths = Arrays.asList(METADATA_PATH,
MANIFEST_LIST_PATH_0,
+ MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
+ Map<Path, FileStatus> expectedPathsAndFileStatuses = Maps.newHashMap();
+ for (String expectedPath : expectedPaths) {
+ expectedPathsAndFileStatuses.putIfAbsent(new Path(expectedPath), new
FileStatus());
+ }
+ MockFileSystemBuilder sourceBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
+ sourceBuilder.addPaths(expectedPaths);
+ FileSystem sourceFs = sourceBuilder.build();
+
+ IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+
+ MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ FileSystem destFs = destBuilder.build();
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(destFs, copyConfigProperties)
+ // without preserving attributes for owner, group and permissions
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
+
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
+ verifyFsOwnershipAndPermissionPreservation(copyEntities,
expectedPathsAndFileStatuses);
+ }
+
/**
* exercise {@link IcebergDataset::getFilePaths} and validate the result
* @return {@link IcebergTable} (mock!), for behavioral verification
*/
- protected IcebergTable validateGetFilePathsGivenDestState(
- List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
- List<String> existingDestPaths,
- Set<Path> expectedResultPaths) throws IOException {
- return validateGetFilePathsGivenDestState(sourceSnapshotPathSets,
Optional.empty(),existingDestPaths, expectedResultPaths);
+ protected IcebergTable
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths>
sourceSnapshotPathSets,
+ List<String> existingDestPaths, Set<Path> expectedResultPaths) throws
IOException {
+ return validateGetFilePathsGivenDestState(sourceSnapshotPathSets,
Optional.empty(), existingDestPaths,
+ expectedResultPaths);
}
/**
* exercise {@link IcebergDataset::getFilePaths} and validate the result
* @return {@link IcebergTable} (mock!), for behavioral verification
*/
- protected IcebergTable validateGetFilePathsGivenDestState(
- List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
- Optional<List<String>> optExistingSourcePaths,
- List<String> existingDestPaths,
- Set<Path> expectedResultPaths) throws IOException {
+ protected IcebergTable
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths>
sourceSnapshotPathSets,
+ Optional<List<String>> optExistingSourcePaths, List<String>
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
IcebergTable icebergTable =
MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset = new IcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergDataset icebergDataset =
+ new IcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
destFsBuilder.addPaths(existingDestPaths);
@@ -307,42 +366,65 @@ public class IcebergDatasetTest {
/** @return `paths` after adding to it all paths of every one of
`snapshotDefs` */
protected static Set<Path> withAllSnapshotPaths(Set<Path> paths,
MockIcebergTable.SnapshotPaths... snapshotDefs) {
Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
- snapshotDef.asSnapshotInfo().getAllPaths().stream()
- ).forEach(p ->
- paths.add(new Path(p))
- );
+ snapshotDef.asSnapshotInfo().getAllPaths().stream())
+ .forEach(p ->
+ paths.add(new Path(p))
+ );
return paths;
}
private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) {
- return CopyConfiguration.builder(fs, copyConfigProperties)
- .copyContext(new CopyContext())
- .build();
+ return CopyConfiguration.builder(fs, copyConfigProperties).copyContext(new
CopyContext()).build();
}
private static void verifyCopyEntities(Collection<CopyEntity> copyEntities,
List<String> expected) {
List<String> actual = new ArrayList<>();
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
- String filepath = new Gson().fromJson(json, JsonObject.class)
- .getAsJsonObject("object-data").getAsJsonObject("origin")
- .getAsJsonObject("object-data").getAsJsonObject("path")
- .getAsJsonObject("object-data").getAsJsonObject("uri")
- .getAsJsonPrimitive("object-data").getAsString();
+ String filepath =
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
actual.add(filepath);
}
- Assert.assertEquals(actual.size(), expected.size(),
- "Set" + actual.toString() + " vs Set" + expected.toString());
+ Assert.assertEquals(actual.size(), expected.size(), "Set" +
actual.toString() + " vs Set" + expected.toString());
Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
}
+ private static void
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities,
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
+ for (CopyEntity copyEntity : copyEntities) {
+ String copyEntityJson = copyEntity.toString();
+ List<CopyEntityDeserializer.FileOwnerAndPermissions>
ancestorFileOwnerAndPermissionsList =
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
+ CopyEntityDeserializer.FileOwnerAndPermissions
destinationFileOwnerAndPermissions =
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
+ Path filePath = new
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+ FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
+ verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
+ // providing path's parent to verify ancestor owner and permissions
+ verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList,
filePath.getParent(), expectedPathsAndFileStatuses);
+ }
+ }
+
+ private static void
verifyFileStatus(CopyEntityDeserializer.FileOwnerAndPermissions actual,
FileStatus expected) {
+ Assert.assertEquals(actual.owner, expected.getOwner());
+ Assert.assertEquals(actual.group, expected.getGroup());
+ Assert.assertEquals(actual.userActionPermission,
expected.getPermission().getUserAction().toString());
+ Assert.assertEquals(actual.groupActionPermission,
expected.getPermission().getGroupAction().toString());
+ Assert.assertEquals(actual.otherActionPermission,
expected.getPermission().getOtherAction().toString());
+ }
+
+ private static void
verifyAncestorPermissions(List<CopyEntityDeserializer.FileOwnerAndPermissions>
actualList, Path path, Map<Path, FileStatus> pathFileStatusMap) {
+
+ for (CopyEntityDeserializer.FileOwnerAndPermissions actual : actualList) {
+ FileStatus expected = pathFileStatusMap.getOrDefault(path, new
FileStatus());
+ verifyFileStatus(actual, expected);
+ path = path.getParent();
+ }
+ }
/**
* Sadly, this is needed to avoid losing `FileSystem` mock to replacement
from the `FileSystem.get` `static`
* Without this, so to lose the mock, we'd be unable to set up any source
paths as existing.
*/
protected static class TrickIcebergDataset extends IcebergDataset {
- public TrickIcebergDataset(String db, String table, IcebergTable
icebergTbl, Properties properties, FileSystem sourceFs) {
+ public TrickIcebergDataset(String db, String table, IcebergTable
icebergTbl, Properties properties,
+ FileSystem sourceFs) {
super(db, table, icebergTbl, properties, sourceFs);
}
@@ -350,14 +432,15 @@ public class IcebergDatasetTest {
protected FileSystem getSourceFileSystemFromFileStatus(FileStatus
fileStatus, Configuration hadoopConfig) throws IOException {
return this.sourceFs;
}
- };
+ }
+ ;
/** Builds a {@link FileSystem} mock */
protected static class MockFileSystemBuilder {
private final URI fsURI;
/** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`,
none exist; else only those indicated do */
- private final Optional<Set<Path>> optPaths;
+ private final Optional<Map<Path, FileStatus>> optPathsWithFileStatuses;
public MockFileSystemBuilder(URI fsURI) {
this(fsURI, false);
@@ -365,47 +448,63 @@ public class IcebergDatasetTest {
public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) {
this.fsURI = fsURI;
- this.optPaths = shouldRepresentEveryPath ? Optional.empty() :
Optional.of(Sets.newHashSet());
- }
-
- public Optional<Set<Path>> getPaths() {
- return this.optPaths.map(Sets::newHashSet); // copy before returning
+ this.optPathsWithFileStatuses = shouldRepresentEveryPath ?
Optional.empty() : Optional.of(Maps.newHashMap());
}
public void addPaths(List<String> pathStrings) {
+ Map<String, FileStatus> map = Maps.newHashMap();
for (String pathString : pathStrings) {
- addPath(pathString);
+ map.putIfAbsent(pathString, null);
}
+ addPathsAndFileStatuses(map);
}
- public void addPath(String pathString) {
- addPath(new Path(pathString));
+ public void addPathsAndFileStatuses(Map<String, FileStatus>
pathAndFileStatuses) {
+ for (Map.Entry<String, FileStatus> entry :
pathAndFileStatuses.entrySet()) {
+ String pathString = entry.getKey();
+ FileStatus fileStatus = entry.getValue();
+ addPathsAndFileStatuses(pathString, fileStatus);
+ }
}
- public void addPath(Path path) {
- if (!this.optPaths.isPresent()) {
- throw new IllegalStateException("unable to add paths when constructed
with `shouldRepresentEveryPath == true`");
+ public void addPathsAndFileStatuses(String pathString, FileStatus
fileStatus) {
+ Path path = new Path(pathString);
+ if(fileStatus != null) { fileStatus.setPath(path);}
+ addPathAndFileStatus(path, fileStatus);
+ }
+
+ public void addPathAndFileStatus(Path path, FileStatus fileStatus) {
+ if (!this.optPathsWithFileStatuses.isPresent()) {
+ throw new IllegalStateException("unable to add paths and file statuses
when constructed");
}
- if (this.optPaths.get().add(path) && !path.isRoot()) { // recursively
add ancestors of a previously unknown path
- addPath(path.getParent());
+ optPathsWithFileStatuses.get().putIfAbsent(path, fileStatus);
+ if (!path.isRoot()) { // recursively add ancestors of a previously
unknown path
+ addPathAndFileStatus(path.getParent(), fileStatus);
}
}
- public FileSystem build() throws IOException {
+ public Map<Path, FileStatus> getPathsAndFileStatuses() {
+ return optPathsWithFileStatuses.get();
+ }
+
+ public FileSystem build()
+ throws IOException {
FileSystem fs = Mockito.mock(FileSystem.class);
Mockito.when(fs.getUri()).thenReturn(fsURI);
Mockito.when(fs.makeQualified(any(Path.class)))
.thenAnswer(invocation -> invocation.getArgumentAt(0,
Path.class).makeQualified(fsURI, new Path("/")));
- if (!this.optPaths.isPresent()) {
- Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation
->
- createEmptyFileStatus(invocation.getArgumentAt(0,
Path.class).toString()));
+ if (!this.optPathsWithFileStatuses.isPresent()) {
+ Mockito.when(fs.getFileStatus(any(Path.class)))
+ .thenAnswer(invocation ->
createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString()));
} else {
// WARNING: order is critical--specific paths *after* `any(Path)`; in
addition, since mocking further
// an already-mocked instance, `.doReturn/.when` is needed (vs.
`.when/.thenReturn`)
Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new
FileNotFoundException());
- for (Path p : this.optPaths.get()) {
-
Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
+ for (Map.Entry<Path, FileStatus> entry :
this.optPathsWithFileStatuses.get().entrySet()) {
+ Path p = entry.getKey();
+ FileStatus fileStatus = entry.getValue();
+ Mockito.doReturn(fileStatus != null ? fileStatus :
createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
}
}
return fs;
@@ -419,7 +518,6 @@ public class IcebergDatasetTest {
}
}
-
private static class MockIcebergTable {
@Data
@@ -445,8 +543,8 @@ public class IcebergDatasetTest {
public static IcebergTable withSnapshots(List<SnapshotPaths>
snapshotPathSets) throws IOException {
IcebergTable table = Mockito.mock(IcebergTable.class);
int lastIndex = snapshotPathSets.size() - 1;
- Mockito.when(table.getCurrentSnapshotInfoOverviewOnly()).thenReturn(
- snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
+ Mockito.when(table.getCurrentSnapshotInfoOverviewOnly())
+
.thenReturn(snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
// ADMISSION: this is strictly more analogous to
`IcebergTable.getAllSnapshotInfosIterator()`, as it doesn't
// filter only the delta... nonetheless, it should work fine for the
tests herein
Mockito.when(table.getIncrementalSnapshotInfosIterator()).thenReturn(
@@ -464,5 +562,64 @@ public class IcebergDatasetTest {
inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
}
}
+
+ private static class CopyEntityDeserializer {
+
+ @Data
+ public static class FileOwnerAndPermissions {
+ String owner;
+ String group;
+ // assigning default values
+ String userActionPermission = FsAction.valueOf("READ_WRITE").toString();
+ String groupActionPermission = FsAction.valueOf("READ_WRITE").toString();
+ String otherActionPermission = FsAction.valueOf("READ_WRITE").toString();
+ }
+
+ public static String getFilePathAsStringFromJson(String json) {
+ String filepath = new Gson().fromJson(json, JsonObject.class)
+ .getAsJsonObject("object-data").getAsJsonObject("origin")
+
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
+
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
+ return filepath;
+ }
+
+ public static List<FileOwnerAndPermissions>
getAncestorOwnerAndPermissions(String json) {
+ JsonArray ancestorsOwnerAndPermissions = new Gson().fromJson(json,
JsonObject.class)
+ .getAsJsonObject("object-data")
+ .getAsJsonArray("ancestorsOwnerAndPermission");
+ List<FileOwnerAndPermissions> fileOwnerAndPermissionsList =
Lists.newArrayList();
+ for (JsonElement jsonElement : ancestorsOwnerAndPermissions) {
+
fileOwnerAndPermissionsList.add(getFileOwnerAndPermissions(jsonElement.getAsJsonObject()));
+ }
+ return fileOwnerAndPermissionsList;
+ }
+
+ public static FileOwnerAndPermissions
getDestinationOwnerAndPermissions(String json) {
+ JsonObject destinationOwnerAndPermissionsJsonObject = new
Gson().fromJson(json, JsonObject.class)
+ .getAsJsonObject("object-data")
+ .getAsJsonObject("destinationOwnerAndPermission");
+ FileOwnerAndPermissions fileOwnerAndPermissions =
getFileOwnerAndPermissions(destinationOwnerAndPermissionsJsonObject);
+ return fileOwnerAndPermissions;
+ }
+
+ private static FileOwnerAndPermissions
getFileOwnerAndPermissions(JsonObject jsonObject) {
+ FileOwnerAndPermissions fileOwnerAndPermissions = new
FileOwnerAndPermissions();
+ JsonObject objData = jsonObject.getAsJsonObject("object-data");
+ fileOwnerAndPermissions.owner = objData.has("owner") ?
objData.getAsJsonPrimitive("owner").getAsString() : Strings.EMPTY;
+ fileOwnerAndPermissions.group = objData.has("group") ?
objData.getAsJsonPrimitive("group").getAsString() : Strings.EMPTY;
+
+ JsonObject fsPermission = objData.has("fsPermission") ?
objData.getAsJsonObject("fsPermission") : null;
+ if (fsPermission != null) {
+ JsonObject objectData = fsPermission.getAsJsonObject("object-data");
+ fileOwnerAndPermissions.userActionPermission =
+
objectData.getAsJsonObject("useraction").getAsJsonPrimitive("object-data").getAsString();
+ fileOwnerAndPermissions.groupActionPermission =
+
objectData.getAsJsonObject("groupaction").getAsJsonPrimitive("object-data").getAsString();
+ fileOwnerAndPermissions.otherActionPermission =
+
objectData.getAsJsonObject("otheraction").getAsJsonPrimitive("object-data").getAsString();
+ }
+ return fileOwnerAndPermissions;
+ }
+ }
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 64aa0a39e..28a9114cc 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -80,6 +80,22 @@ public class PathUtils {
return getRootPath(path.getParent());
}
+ /**
+ * Returns the root path child for the specified path.
+ * Example: input: /a/b/c then it will return /a
+ *
+ */
+ public static Path getRootPathChild(Path path) {
+ if (path.getParent() == null) {
+ return null;
+ }
+
+ if (path.getParent().isRoot()) {
+ return path;
+ }
+ return getRootPathChild(path.getParent());
+ }
+
/**
* Removes the leading slash if present.
*