aplex commented on a change in pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r449186127



##########
File path: 
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
##########
@@ -78,49 +82,101 @@ public void setup() throws Exception {
   public void testWrite() throws Exception {
     String streamString1 = "testContents1";
     String streamString2 = "testContents2";
+
     String userDefStagingDir = System.getProperty("user.dir") + 
"/user_staging_dir";
+
+    testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+        getClass().getSimpleName());
+    localFs = FileSystem.getLocal(new Configuration());
+
+    Path ds1 = createDatasetPath("db1");
+    String dataSetDefinedStagingDir = testRootDir + "/db1/staging";
+
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
     WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state, cf);
     CopySource.serializeCopyableDataset(state, metadata);
+
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+
     FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
         
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build();
+
     Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString())), streamString1);
 
     //testing user defined staging directory
     WorkUnitState state2 = TestUtils.createTestWorkUnitState();
+    state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true);
     state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(testTempPath, "staging").toString());
     state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output2").toString());
     
state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir);
     state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state2, cf);
     CopySource.serializeCopyableDataset(state2, metadata);
+
     dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0);
+
     fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build();
+
     
Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     writtenFilePath = new Path(new 
Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString())), streamString2);
+
+    //testing dataset defined staging directory

Review comment:
       Can we split this large test into several smaller unit tests that are 
focusing on specific use case? Then it will be easier to understand what are we 
testing and expecting.  The common logic can be moved to a separate methods 
that we call explicitly. TestNG can also call them before each test, if we put 
@BeforeTest/@BeforeClass annotation on them.

##########
File path: 
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
##########
@@ -78,49 +82,101 @@ public void setup() throws Exception {
   public void testWrite() throws Exception {
     String streamString1 = "testContents1";
     String streamString2 = "testContents2";
+
     String userDefStagingDir = System.getProperty("user.dir") + 
"/user_staging_dir";
+
+    testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+        getClass().getSimpleName());
+    localFs = FileSystem.getLocal(new Configuration());
+
+    Path ds1 = createDatasetPath("db1");
+    String dataSetDefinedStagingDir = testRootDir + "/db1/staging";
+
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
     WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state, cf);
     CopySource.serializeCopyableDataset(state, metadata);
+
     FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+
     FileAwareInputStream fileAwareInputStream = 
FileAwareInputStream.builder().file(cf)
         
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build();
+
     Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString())), streamString1);
 
     //testing user defined staging directory
     WorkUnitState state2 = TestUtils.createTestWorkUnitState();
+    state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true);
     state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(testTempPath, "staging").toString());
     state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output2").toString());
     
state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir);
     state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state2, cf);
     CopySource.serializeCopyableDataset(state2, metadata);
+
     dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0);
+
     fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         
.inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build();
+
     
Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     writtenFilePath = new Path(new 
Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new 
FileInputStream(writtenFilePath.toString())), streamString2);
+
+    //testing dataset defined staging directory
+    CopyableDatasetMetadata metadata2 = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path(testRootDir + "/db1")));
+    WorkUnitState state3 = TestUtils.createTestWorkUnitState();
+    state3.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,true);

Review comment:
       There is a missing space here. You can run reformat code command in 
IntelliJ to fix all the code style issues. If you want to fix only your code, 
you can first select it in the editor, and then run the command.

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -88,6 +89,7 @@
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
+  public static final String STAGING_DIR_SUFFIX = "/staging";

Review comment:
       There are a couple of things we need to keep in mind regarding this 
folder:
   
   1. Since we are going to put those in every dataset folder, we need to have 
a way to tell apart folders that are part of the dataset vs our temporary 
storage places.
   2.  If our folder name starts with the dot ".", it will be hidden by default 
in UI and CLI tools, so users wouldn't be distracted by those folders. 
   3. We'll need to have a way to find all such folders and delete them from 
time to time. If job gets interrupted and does not clean up after itself, some 
automation would need to go and delete old temporary files. Otherwise those 
abandoned temp files will consume all of the storage space.
   
   I think we can go with ".tmp" to ensure that this folder is hidden and is 
clearly marked as temporary.

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if 
(state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false))
 {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() 
+ STAGING_DIR_SUFFIX);

Review comment:
       You can pass multiple arguments to Path to concatenate them. It's better 
to avoid adding path strings to each other, because you can end up with 
multiple of no slashes in the path. For example, depending on user input, you 
can end up with "/first/second", "/firstsecond" or "/first//second". 

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if 
(state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false))
 {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() 
+ STAGING_DIR_SUFFIX);

Review comment:
       URN is a standard with a special format like "urn:linkedin:data". Looks 
like we'll have a path and not a URN here. I wonder if there are other methods 
that will return data set location, or whenever we can rename this method to 
something like getDatasetURI() or getDatasetURL(), depending on what we can get 
here. See also the discussion on the differences between URI/URL/URN - 
https://stackoverflow.com/a/28865728




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to