sv2000 commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r594780453
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +366,13 @@ public Void call() {
workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
}
}
- if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
- workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+ // Ensure that the writer temporary directories are contained within
the dataset shard
+ if
(state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR,false)) {
Review comment:
Should we just set DATASET_DESTINATION_PATH regardless of whether we
want to use DATASET_LOCAL_WORK_DIR?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterBuilder.java
##########
@@ -52,14 +53,13 @@
public synchronized static void setJobSpecificOutputPaths(State state) {
// Other tasks may have set this already
+ // If reading from the sharded dataset path, writer directory paths are
stored in the workunit state
if
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
-
- state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY)));
- state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
- state.getProp(ConfigurationKeys.JOB_ID_KEY)));
-
+ state.getProp(ConfigurationKeys.JOB_ID_KEY)) &&
!state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR)) {
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
Review comment:
Can all of this be moved inside the handler?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
##########
@@ -117,8 +117,9 @@ public CopyDataPublisher(State state) throws IOException {
this.fs = FileSystem.get(URI.create(uri),
WriterUtils.getFsConfiguration(state));
FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
- this.writerOutputDir = new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ // If directories are sharded by dataset, initialize writers using
workunit state instead
+ this.writerOutputDir =
state.getPropAsBoolean(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR) ?
Review comment:
Shouldn't WRITER_OUTPUT_DIR be set inside the handler? If so, we don't
need these changes here, do we?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]