sv2000 commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r594780453



##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +366,13 @@ public Void call() {
               workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
             }
           }
-          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
-            workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+          // Ensure that the writer temporary directories are contained within 
the dataset shard
+          if 
(state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR,false)) {

Review comment:
       Should we just set DATASET_DESTINATION_PATH regardless of whether we 
want to use DATASET_LOCAL_WORK_DIR?

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterBuilder.java
##########
@@ -52,14 +53,13 @@
   public synchronized static void setJobSpecificOutputPaths(State state) {
 
     // Other tasks may have set this already
+    // If reading from the sharded dataset path, writer directory paths are 
stored in the workunit state
     if 
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
-        state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
-
-      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
-          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-      state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
-          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-
+        state.getProp(ConfigurationKeys.JOB_ID_KEY)) && 
!state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR)) {
+        state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),

Review comment:
       Can all of this be moved inside the handler? 

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
##########
@@ -117,8 +117,9 @@ public CopyDataPublisher(State state) throws IOException {
     this.fs = FileSystem.get(URI.create(uri), 
WriterUtils.getFsConfiguration(state));
 
     FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
-    this.writerOutputDir = new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+    // If directories are sharded by dataset, initialize writers using 
workunit state instead
+    this.writerOutputDir = 
state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR) ?

Review comment:
       Shouldn't WRITER_OUTPUT_DIR be set inside the handler? If so, we don't 
need these changes here, do we?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to