autumnust commented on a change in pull request #3054: URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r448641128
########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId URI uri = URI.create(uriStr); this.fs = FileSystem.get(uri, conf); this.fileContext = FileContext.getFileContext(uri, conf); + this.copyableDatasetMetadata = + CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); + } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { + this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX); } else { this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get()) : WriterUtils.getWriterStagingDir(state, numBranches, branchId); } this.outputDir = getOutputDir(state); - this.copyableDatasetMetadata = - CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); + Review comment: same as above ########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId URI uri = URI.create(uriStr); this.fs = FileSystem.get(uri, conf); this.fileContext = FileContext.getFileContext(uri, conf); + this.copyableDatasetMetadata = + CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); + } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { Review comment: Hmm I think there's something missing here. Please take a look at `org.apache.gobblin.util.WriterUtils#getWriterStagingDir(org.apache.gobblin.configuration.State, int, int)` method, which is called below. The determination of a staging directory is not totally based on user-defined configuration, but some runtime behavior. Specifically within the `getWriterStagingDir`, the root path of staging dir comes from `org.apache.gobblin.configuration.ConfigurationKeys#WRITER_STAGING_DIR` which is a conf set dynamically in the gobblin job. In the method it also contains logic to differentiate different staging directory for different forks. The current change you made will lost that feature. We can sync offline on this if that could help. ########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -104,6 +106,7 @@ private final Options.Rename renameOptions; private final FileContext fileContext; + Review comment: Can you remove this empty line if being added accidentally ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org