autumnust commented on a change in pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r448641128



##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if 
(state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false))
 {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() 
+ STAGING_DIR_SUFFIX);
     } else {
       this.stagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils.getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
           : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
     }
 
     this.outputDir = getOutputDir(state);
-    this.copyableDatasetMetadata =
-        
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+

Review comment:
       same as above

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if 
(state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false))
 {

Review comment:
       Hmm I think there's something missing here.
   
   Please take a look at 
`org.apache.gobblin.util.WriterUtils#getWriterStagingDir(org.apache.gobblin.configuration.State,
 int, int)` method, which is called below. The determination of a staging 
directory is not totally based on user-defined configuration, but some runtime 
behavior.
   
   
   Specifically within the `getWriterStagingDir`, the root path of staging dir 
comes from 
`org.apache.gobblin.configuration.ConfigurationKeys#WRITER_STAGING_DIR` which 
is a conf set dynamically in the gobblin job. In the method it also contains 
logic to differentiate different staging directory for different forks. The 
current change you made will lost that feature. 
   
   We can sync offline on this if that could help. 

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -104,6 +106,7 @@
   private final Options.Rename renameOptions;
   private final FileContext fileContext;
 
+

Review comment:
       Can you remove this empty line if being added accidentally ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to