aplex commented on a change in pull request #3158:
URL: https://github.com/apache/incubator-gobblin/pull/3158#discussion_r561210799
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
##########
@@ -573,6 +631,8 @@ protected int addTableDeregisterSteps(List<CopyEntity>
copyEntities, String file
TableDeregisterStep deregister =
new TableDeregisterStep(table.getTTable(), this.getTargetURI(),
this.getHiveRegProps());
copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String>
newHashMap(), deregister, stepPriority++));
+ targetDirectoryClient.deletePath(table.getDataLocation());
Review comment:
A couple of things here:
- Looks like a table can have multiple paths (see above)
- The surrounding code does not actually delete files or folders, it creates
steps that will be executed later on. I guess the motivation for this is to run
those deletions in parallel, have retry logic, or something like that. We
should follow the same pattern, and either add a step, or update existing one.
Same in other places.
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
##########
@@ -389,6 +340,106 @@
}
}
+ /**
+ * Checks {@value COPY_PARTITION_FILTER_GENERATOR} in configuration to
determine which class to use for hive filtering
+ * Default is to filter based on {@value COPY_PARTITIONS_FILTER_CONSTANT}, a
constant regex
+ * @throws ReflectiveOperationException if the generator class in the
configuration is not found
+ */
+ private Optional<String> initializePartitionFilter() throws
ReflectiveOperationException {
+ if
(this.dataset.getProperties().containsKey(COPY_PARTITION_FILTER_GENERATOR)) {
+ PartitionFilterGenerator generator =
GobblinConstructorUtils.invokeFirstConstructor(
+ (Class<PartitionFilterGenerator>) Class.forName(
+
this.dataset.getProperties().getProperty(COPY_PARTITION_FILTER_GENERATOR)),
+ Lists.<Object>newArrayList(this.dataset.getProperties()),
Lists.newArrayList());
+ Optional<String> partitionFilter =
Optional.of(generator.getFilter(this.dataset));
+ log.info(String.format("Dynamic partition filter for table %s: %s.",
this.dataset.table.getCompleteName(),
+ partitionFilter.get()));
+ return partitionFilter;
+ } else {
+ return
Optional.fromNullable(this.dataset.getProperties().getProperty(COPY_PARTITIONS_FILTER_CONSTANT));
+ }
+ }
+
+ /**
+ * Checks {@value HIVE_PARTITION_EXTENDED_FILTER_TYPE} in configuration to
initialize more granular filtering class
+ * Default is to use none
+ * @throws ReflectiveOperationException if the filter class in the
configuration is not found
+ */
+ private Optional<HivePartitionExtendedFilter>
initializeExtendedPartitionFilter() throws IOException,
ReflectiveOperationException {
+ // Initialize extended partition filter
Review comment:
Don't need that comment, method name covers the meaning
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveTargetDirectoryClient.java
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.data.management.copy.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Determines and manages target paths for {@link HiveCopyEntityHelper}
+ * An example implementation would create target directories on cloud
environments before initiating the copy
+ * By default, returns the input path (do nothing)
+ */
+public class HiveTargetDirectoryClient {
Review comment:
This can be an interface and a default implementation class.
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveTargetDirectoryClient.java
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.data.management.copy.hive;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Determines and manages target paths for {@link HiveCopyEntityHelper}
+ * An example implementation would create target directories on cloud
environments before initiating the copy
+ * By default, returns the input path (do nothing)
+ */
+public class HiveTargetDirectoryClient {
+
+ public HiveTargetDirectoryClient(Properties properties){}
+
+ /**
+ * Creates a target directory path if it does not exist, otherwise fetch its
path
+ * @param path initial target path
+ * @return the path after creating the target directory
+ * @throws IOException if creating or getting the path fails
+ */
+ public Path getOrCreateTargetPath(Path path) throws IOException {
+ return path;
+ }
+
+ /**
+ * Deletes references to the path from the directory client
+ * @param path path on target directory
+ * @throws IOException if deleting the path fails
+ */
+ public void deletePath(Path path) throws IOException {
Review comment:
This is a surprising behavior - we call delete and nothing happens by
default. I would expect it to delete a thing.
Let's think about how this interface can be made more obvious.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]