Will-Lo commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r594903298



##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterBuilder.java
##########
@@ -52,14 +53,13 @@
   public synchronized static void setJobSpecificOutputPaths(State state) {
 
     // Other tasks may have set this already
+    // If reading from the sharded dataset path, writer directory paths are 
stored in the workunit state
     if 
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
-        state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
-
-      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
-          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-      state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
-          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-
+        state.getProp(ConfigurationKeys.JOB_ID_KEY)) && 
!state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR)) {
+        state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),

Review comment:
       This would be a bit hard to move as it's covering the pre-existing 
scenario if the user is not using distcp with shard configuration, unless we 
set this logic as a new default handler?

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
##########
@@ -117,8 +117,9 @@ public CopyDataPublisher(State state) throws IOException {
     this.fs = FileSystem.get(URI.create(uri), 
WriterUtils.getFsConfiguration(state));
 
     FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
-    this.writerOutputDir = new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+    // If directories are sharded by dataset, initialize writers using 
workunit state instead
+    this.writerOutputDir = 
state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR) ?

Review comment:
       It's only in the shard-specific handler where this is being set, it's 
not being set otherwise unless we create a default handler that performs this 
logic. I think it may be the way forward considering that this logic is 
repeated in the writer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to