Will-Lo commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r594903298
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterBuilder.java
##########
@@ -52,14 +53,13 @@
public synchronized static void setJobSpecificOutputPaths(State state) {
// Other tasks may have set this already
+ // If reading from the sharded dataset path, writer directory paths are
stored in the workunit state
if
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
-
- state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY)));
- state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-
+ state.getProp(ConfigurationKeys.JOB_ID_KEY)) &&
!state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR)) {
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
Review comment:
This would be a bit hard to move as it's covering the pre-existing
scenario if the user is not using distcp with shard configuration, unless we
set this logic as a new default handler?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
##########
@@ -117,8 +117,9 @@ public CopyDataPublisher(State state) throws IOException {
this.fs = FileSystem.get(URI.create(uri),
WriterUtils.getFsConfiguration(state));
FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
- this.writerOutputDir = new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ // If directories are sharded by dataset, initialize writers using
workunit state instead
+ this.writerOutputDir =
state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR) ?
Review comment:
It's only in the shard-specific handler where this is being set, it's
not being set otherwise unless we create a default handler that performs this
logic. I think it may be the way forward considering that this logic is
repeated in the writer
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]