phet commented on code in PR #3616:
URL: https://github.com/apache/gobblin/pull/3616#discussion_r1099045862


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java:
##########
@@ -402,10 +409,169 @@ public void testCommit() throws IOException {
     // previously existing paths should not have permissions changed
     fileStatus = this.fs.getFileStatus(existingOutputPath);
     Assert.assertEquals(fileStatus.getPermission(), existingPathPermission);
-
     Assert.assertFalse(this.fs.exists(writer.stagingDir));
   }
 
+  @Test
+  public void testCommitWithAclPreservationWhenAncestorPathsAbsent() throws 
IOException {
+    String fileName = "file";
+    // Asemble destination paths
+    String destinationExistingToken = "destination";
+    String destinationAdditionalTokens = "path";
+    Path destination = new Path(new Path(new Path("/", 
destinationExistingToken), destinationAdditionalTokens), fileName);
+    Path destinationWithoutLeadingSeparator = new Path(new 
Path(destinationExistingToken, destinationAdditionalTokens), fileName);
+
+    // Create temp directory
+    File tmpFile = Files.createTempDir();
+    tmpFile.deleteOnExit();
+    Path tmpPath = new Path(tmpFile.getAbsolutePath());
+
+    // create origin file
+    Path originFile = new Path(tmpPath, fileName);
+    this.fs.createNewFile(originFile);
+
+    // create stating dir
+    Path stagingDir = new Path(tmpPath, "staging");
+    this.fs.mkdirs(stagingDir);
+
+    // create output dir
+    Path outputDir = new Path(tmpPath, "output");
+    this.fs.mkdirs(outputDir);
+
+    // create copyable file
+    CopyableFile cf = createCopyableFile(originFile, destination, "a");
+
+    // create work unit state
+    WorkUnitState state = createWorkUnitState(stagingDir, outputDir, cf);
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+    // create writer
+    FileAwareInputStreamDataWriter writer = new 
FileAwareInputStreamDataWriter(state, fs,1, 0, null);
+
+    // create output of writer.write
+    Path writtenFile = writer.getStagingFilePath(cf);
+    this.fs.mkdirs(writtenFile.getParent());
+    this.fs.createNewFile(writtenFile);
+    Path outputRoot = 
FileAwareInputStreamDataWriter.getPartitionOutputRoot(outputDir, 
cf.getDatasetAndPartition(metadata));
+    Path expectedOutputPath = new Path(outputRoot, 
destinationWithoutLeadingSeparator);
+    // check ancestor path is absent before commit
+    
Assert.assertFalse(this.fs.exists(expectedOutputPath.getParent().getParent()));
+    writer.actualProcessedCopyableFile = Optional.of(cf);
+    // commit
+    writer.commit();
+    // check ancestor path was created as part of commit
+    
Assert.assertTrue(this.fs.exists(expectedOutputPath.getParent().getParent()));
+    verifyAclEntries(writer, this.fs.getPathToAclEntries(), 
expectedOutputPath, outputDir);
+  }
+
+  @Test
+  public void testCommitWithAclPreservationWhenAncestorPathsPresent() throws 
IOException {
+    String fileName = "file";
+    // Asemble destination paths
+    String destinationExistingToken = "destination";
+    String destinationAdditionalTokens = "path";
+    Path destination = new Path(new Path(new Path("/", 
destinationExistingToken), destinationAdditionalTokens), fileName);
+    Path destinationWithoutLeadingSeparator = new Path(new 
Path(destinationExistingToken, destinationAdditionalTokens), fileName);
+
+    // Create temp directory
+    File tmpFile = Files.createTempDir();
+    tmpFile.deleteOnExit();
+    Path tmpPath = new Path(tmpFile.getAbsolutePath());
+
+    // create origin file
+    Path originFile = new Path(tmpPath, fileName);
+    this.fs.createNewFile(originFile);
+
+    // create stating dir
+    Path stagingDir = new Path(tmpPath, "staging");
+    this.fs.mkdirs(stagingDir);
+
+    // create output dir
+    Path outputDir = new Path(tmpPath, "output");
+    this.fs.mkdirs(outputDir);
+
+    // create copyable file
+    CopyableFile cf = createCopyableFile(originFile, destination, "a");
+
+    // create work unit state
+    WorkUnitState state = createWorkUnitState(stagingDir, outputDir, cf);
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+    // create writer
+    FileAwareInputStreamDataWriter writer = new 
FileAwareInputStreamDataWriter(state, fs,1, 0, null);
+
+    // create output of writer.write
+    Path writtenFile = writer.getStagingFilePath(cf);
+    this.fs.mkdirs(writtenFile.getParent());
+    this.fs.createNewFile(writtenFile);
+    // create existing directories in writer output
+    Path outputRoot = 
FileAwareInputStreamDataWriter.getPartitionOutputRoot(outputDir, 
cf.getDatasetAndPartition(metadata));
+    Path existingOutputPath = new Path(outputRoot, destinationExistingToken);
+    Path expectedOutputPath = new Path(outputRoot, 
destinationWithoutLeadingSeparator);
+    // create output path and check ancestor path is present
+    this.fs.mkdirs(existingOutputPath);
+    
Assert.assertTrue(this.fs.exists(expectedOutputPath.getParent().getParent()));
+    writer.actualProcessedCopyableFile = Optional.of(cf);
+    // commit
+    writer.commit();
+    verifyAclEntries(writer, this.fs.getPathToAclEntries(), 
expectedOutputPath, existingOutputPath);
+  }
+
+  private void verifyAclEntries(FileAwareInputStreamDataWriter writer, 
ImmutableMap pathToAclEntries, Path expectedOutputPath, Path ancestorRoot) {
+    // fetching and preparing file paths from FileAwareInputStreamDataWriter 
object
+    Path outputDir = writer.outputDir;
+    String[] splitExpectedOutputPath = 
expectedOutputPath.toString().split("output");
+    Path dstOutputPath = new 
Path(outputDir.toString().concat(splitExpectedOutputPath[1])).getParent();
+
+    OwnerAndPermission destinationOwnerAndPermission = 
writer.actualProcessedCopyableFile.get().getDestinationOwnerAndPermission();
+    List<AclEntry> actual = destinationOwnerAndPermission.getAclEntries();
+    while (!dstOutputPath.equals(ancestorRoot)) {
+      List<AclEntry> expected = (List<AclEntry>) 
pathToAclEntries.get(dstOutputPath);
+      Assert.assertEquals(actual, expected);
+      dstOutputPath = dstOutputPath.getParent();
+    }
+  }
+
+  private WorkUnitState createWorkUnitState(Path stagingDir, Path outputDir, 
CopyableFile cf) {

Review Comment:
   nice abstractions, this and `createCopyableFile`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to