aplex commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r589649921



##########
File path: 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -1062,4 +1061,11 @@
    */
   public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"gobblin.task.event.metadata.generator.class";
   public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"nooptask";
+
+  /**
+   * Configuration for sharded directory files
+   */
+  public static final String USE_SHARDED_WRITER_DIRS = 
"gobblin.dataset.sharded.work.dirs";

Review comment:
       Naming is hard :) Looks like from the perspective of Gobblin code base, 
this flag controls the location of stating/temp folder, placing it inside the 
dataset. The typical use case is sharding, but looks like it can be used 
independently of it (e.g. when no global staging/temp place is writable).
   
   Also, those dots in the name are supposed to denote properties of an object, 
so there should be a logical object "gobblin.dataset" and 
"gobblin.dataset.sharded". The latter one seems to be a strange object to have.
   
   How about something like "gobblin.useDatasetLocalWorkDir" or 
"gobblin.putWorkDirInDatasetFolder"?

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +368,16 @@ public Void call() {
               workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
             }
           }
-          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
-            workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+          // Ensure that the writer temporary directories are contained within 
the dataset shard
+          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.USE_SHARDED_WRITER_DIRS,false))) {
+            String datasetPath = ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH);
+            workUnit.setProp(ConfigurationKeys.WRITER_STAGING_DIR, datasetPath 
+ ConfigurationKeys.STAGING_DIR_DEFAULT_SUFFIX + "/" + state

Review comment:
       Looks like this new USE_SHARDED_WRITER_DIRS property is mutually 
exclusive with  ConfigurationKeys.WRITER_STAGING_DIR  and output dir. We can 
add a check somewhere in the code to either fail the job if both are present, 
or to produce a warning in logs. This will prevent confusion from SRE when they 
specify a writer dir, but it is overwritten/ignored by the code.

##########
File path: 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -210,7 +210,6 @@
   public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = 
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
   public static final String USER_DEFINED_STATIC_STAGING_DIR = 
"user.defined.static.staging.dir";
   public static final String USER_DEFINED_STAGING_DIR_FLAG = 
"user.defined.staging.dir.flag";
-  public static final String IS_DATASET_STAGING_DIR_USED = 
"dataset.staging.dir.used";

Review comment:
       Note: need to update the docs/wiki pages after this is merged, to 
reflect the new name.

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
##########
@@ -130,7 +130,9 @@ public HiveDataset(FileSystem fs, HiveMetastoreClientPool 
clientPool, Table tabl
     this.tableIdentifier = this.table.getDbName() + "." + 
this.table.getTableName();
     Path tableLocation = this.table.getPath();
     if (!(this.properties.isEmpty())) {

Review comment:
       This check looks suspicious - "If we pass any kind of properties to the 
dataset, then override the staging path". Should it check a specific property 
here?

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
##########
@@ -130,7 +130,9 @@ public HiveDataset(FileSystem fs, HiveMetastoreClientPool 
clientPool, Table tabl
     this.tableIdentifier = this.table.getDbName() + "." + 
this.table.getTableName();
     Path tableLocation = this.table.getPath();
     if (!(this.properties.isEmpty())) {
-      String datasetStagingDir = 
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" + 
tableLocation.getName();
+      // Will return staging path
+      String datasetStagingDir = 
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" +
+          tableLocation.getParent().getName() + "/" + tableLocation.getName();

Review comment:
       I wonder if this should be more generic. For example instead of 
replacing last segment or last two segments in the path, it might need to 
replace the prefix in the destination path.
   
   Would be good to cover this with unit tests to account for various situations

##########
File path: 
gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
##########
@@ -197,6 +197,14 @@ public static void cleanTaskStagingData(State state, 
Logger logger) throws IOExc
           throw new IOException("Clean up output directory " + 
outputPath.toUri().getPath() + " failed");
         }
       }
+
+      Path errPath = new 
Path(state.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));

Review comment:
       Is this cleanup related to sharding, or it is some separate improvement?
   
   Also, is this error file not part of staging/output folder that are cleaned 
up earlier in this function?

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +366,25 @@ public Void call() {
               workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
             }
           }
-          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
-            workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+          // Ensure that the writer temporary directories are contained within 
the dataset shard
+          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.USE_SHARDED_WRITER_DIRS,false))) {

Review comment:
       Would be good to add a bunch of unit tests, verifying that 
dataset-local-staging flag works as expect. This should help us iron out typos 
before doing end-to-end tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to