aplex commented on a change in pull request #3158:
URL: https://github.com/apache/gobblin/pull/3158#discussion_r589649921
##########
File path:
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -1062,4 +1061,11 @@
*/
public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"gobblin.task.event.metadata.generator.class";
public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"nooptask";
+
+ /**
+ * Configuration for sharded directory files
+ */
+ public static final String USE_SHARDED_WRITER_DIRS =
"gobblin.dataset.sharded.work.dirs";
Review comment:
Naming is hard :) Looks like from the perspective of Gobblin code base,
this flag controls the location of stating/temp folder, placing it inside the
dataset. The typical use case is sharding, but looks like it can be used
independently of it (e.g. when no global staging/temp place is writable).
Also, those dots in the name are supposed to denote properties of an object,
so there should be a logical object "gobblin.dataset" and
"gobblin.dataset.sharded". The latter one seems to be a strange object to have.
How about something like "gobblin.useDatasetLocalWorkDir" or
"gobblin.putWorkDirInDatasetFolder"?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +368,16 @@ public Void call() {
workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
}
}
- if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
- workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+ // Ensure that the writer temporary directories are contained within
the dataset shard
+ if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.USE_SHARDED_WRITER_DIRS,false))) {
+ String datasetPath = ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH);
+ workUnit.setProp(ConfigurationKeys.WRITER_STAGING_DIR, datasetPath
+ ConfigurationKeys.STAGING_DIR_DEFAULT_SUFFIX + "/" + state
Review comment:
Looks like this new USE_SHARDED_WRITER_DIRS property is mutually
exclusive with ConfigurationKeys.WRITER_STAGING_DIR and output dir. We can
add a check somewhere in the code to either fail the job if both are present,
or to produce a warning in logs. This will prevent confusion from SRE when they
specify a writer dir, but it is overwritten/ignored by the code.
##########
File path:
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -210,7 +210,6 @@
public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY =
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
public static final String USER_DEFINED_STATIC_STAGING_DIR =
"user.defined.static.staging.dir";
public static final String USER_DEFINED_STAGING_DIR_FLAG =
"user.defined.staging.dir.flag";
- public static final String IS_DATASET_STAGING_DIR_USED =
"dataset.staging.dir.used";
Review comment:
Note: need to update the docs/wiki pages after this is merged, to
reflect the new name.
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
##########
@@ -130,7 +130,9 @@ public HiveDataset(FileSystem fs, HiveMetastoreClientPool
clientPool, Table tabl
this.tableIdentifier = this.table.getDbName() + "." +
this.table.getTableName();
Path tableLocation = this.table.getPath();
if (!(this.properties.isEmpty())) {
Review comment:
This check looks suspicious - "If we pass any kind of properties to the
dataset, then override the staging path". Should it check a specific property
here?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
##########
@@ -130,7 +130,9 @@ public HiveDataset(FileSystem fs, HiveMetastoreClientPool
clientPool, Table tabl
this.tableIdentifier = this.table.getDbName() + "." +
this.table.getTableName();
Path tableLocation = this.table.getPath();
if (!(this.properties.isEmpty())) {
- String datasetStagingDir =
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" +
tableLocation.getName();
+ // Will return staging path
+ String datasetStagingDir =
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" +
+ tableLocation.getParent().getName() + "/" + tableLocation.getName();
Review comment:
I wonder if this should be more generic. For example instead of
replacing last segment or last two segments in the path, it might need to
replace the prefix in the destination path.
Would be good to cover this with unit tests to account for various situations
##########
File path:
gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
##########
@@ -197,6 +197,14 @@ public static void cleanTaskStagingData(State state,
Logger logger) throws IOExc
throw new IOException("Clean up output directory " +
outputPath.toUri().getPath() + " failed");
}
}
+
+ Path errPath = new
Path(state.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
Review comment:
Is this cleanup related to sharding, or it is some separate improvement?
Also, is this error file not part of staging/output folder that are cleaned
up earlier in this function?
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -370,9 +366,25 @@ public Void call() {
workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
}
}
- if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
- workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+
+ // Ensure that the writer temporary directories are contained within
the dataset shard
+ if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.USE_SHARDED_WRITER_DIRS,false))) {
Review comment:
Would be good to add a bunch of unit tests, verifying that
dataset-local-staging flag works as expect. This should help us iron out typos
before doing end-to-end tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]