Repository: incubator-gobblin Updated Branches: refs/heads/master 8284bb76b -> e67799948
[GOBBLIN-266] Improve Hive Task setup Closes #2117 from ibuenros/hive-materializer-2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e6779994 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e6779994 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e6779994 Branch: refs/heads/master Commit: e677999481f1311cf7bdc0ebe207a816e0e28382 Parents: 8284bb7 Author: ibuenros <[email protected]> Authored: Fri Sep 29 10:37:14 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Fri Sep 29 10:37:14 2017 -0700 ---------------------------------------------------------------------- .../materializer/CopyTableQueryGenerator.java | 2 ++ .../HiveMaterializerQueryGenerator.java | 13 ++++++- .../MaterializeTableQueryGenerator.java | 1 + .../QueryBasedMaterializerQueryGenerator.java | 1 + .../conversion/hive/task/HiveTask.java | 37 ++++++++++++++++++++ 5 files changed, 53 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java index 8ff0913..9fdf9df 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java @@ -50,6 +50,8 @@ public class CopyTableQueryGenerator extends HiveMaterializerFromEntityQueryGene @Override public List<String> generateQueries() { + ensureParentOfStagingPathExists(); + List<String> hiveQueries = Lists.newArrayList(); /* * Setting partition mode to 'nonstrict' is needed to improve readability of the code. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java index 803e043..eb43bc7 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java @@ -34,6 +34,7 @@ import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit; import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.hadoop.fs.Path; import lombok.extern.slf4j.Slf4j; @@ -80,11 +81,21 @@ public abstract class HiveMaterializerQueryGenerator implements QueryGenerator { /** * Returns hive queries to be run as a part of a hive task. * This does not include publish queries. - * @return */ @Override public abstract List<String> generateQueries(); + protected void ensureParentOfStagingPathExists() { + try { + Path parentStagingPath = new Path(this.stagingDataLocation).getParent(); + if (!this.fs.exists(parentStagingPath)) { + this.fs.mkdirs(parentStagingPath); + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + /** * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands. * @return QueryBasedHivePublishEntity http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java index fa91d15..d15cc0d 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java @@ -43,6 +43,7 @@ public class MaterializeTableQueryGenerator extends HiveMaterializerFromEntityQu @Override public List<String> generateQueries() { + ensureParentOfStagingPathExists(); return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatementFromSelectStar( new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName), new HiveDatasetFinder.DbAndTable(this.inputDbName, this.inputTableName), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java index 37a50b3..3b3f904 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java @@ -54,6 +54,7 @@ public class QueryBasedMaterializerQueryGenerator extends HiveMaterializerQueryG @Override public List<String> generateQueries() { + ensureParentOfStagingPathExists(); return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatement( new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName), this.sourceQuery, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e6779994/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java index 16a2028..26e357f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -55,6 +56,9 @@ import lombok.extern.slf4j.Slf4j; */ public abstract class HiveTask extends BaseAbstractTask { private static final String USE_WATERMARKER_KEY = "internal.hiveTask.useWatermarker"; + private static final String ADD_FILES = "internal.hiveTask.addFiles"; + private static final String ADD_JARS = "internal.hiveTask.addJars"; + private static final String SETUP_QUERIES = "internal.hiveTask.setupQueries"; /** * Disable Hive watermarker. This is necessary when there is no concrete source table where watermark can be inferred. @@ -63,6 +67,27 @@ public abstract class HiveTask extends BaseAbstractTask { state.setProp(USE_WATERMARKER_KEY, Boolean.toString(false)); } + /** + * Add the input file to the Hive session before running the task. + */ + public static void addFile(State state, String file) { + state.setProp(ADD_FILES, state.getProp(ADD_FILES, "") + "," + file); + } + + /** + * Add the input jar to the Hive session before running the task. + */ + public static void addJar(State state, String jar) { + state.setProp(ADD_JARS, state.getProp(ADD_JARS, "") + "," + jar); + } + + /** + * Run the specified setup query on the Hive session before running the task. + */ + public static void addSetupQuery(State state, String query) { + state.setProp(SETUP_QUERIES, state.getProp(SETUP_QUERIES, "") + ";" + query); + } + protected final TaskContext taskContext; protected final WorkUnitState workUnitState; protected final HiveWorkUnit workUnit; @@ -71,6 +96,10 @@ public abstract class HiveTask extends BaseAbstractTask { protected final QueryBasedHivePublishEntity publishEntity; protected final HiveJdbcConnector hiveJdbcConnector; + private final List<String> addFiles; + private final List<String> addJars; + private final List<String> setupQueries; + public HiveTask(TaskContext taskContext) { super(taskContext); this.taskContext = taskContext; @@ -85,6 +114,10 @@ public abstract class HiveTask extends BaseAbstractTask { } catch (SQLException se) { throw new RuntimeException("Error in creating JDBC Connector", se); } + + this.addFiles = this.workUnitState.getPropAsList(ADD_FILES, ""); + this.addJars = this.workUnitState.getPropAsList(ADD_JARS, ""); + this.setupQueries = Splitter.on(";").trimResults().omitEmptyStrings().splitToList(this.workUnitState.getProp(SETUP_QUERIES, "")); } /** @@ -166,6 +199,10 @@ public abstract class HiveTask extends BaseAbstractTask { public void run() { try { List<String> queries = generateHiveQueries(); + + this.hiveJdbcConnector.executeStatements(Lists.transform(this.addFiles, file -> "ADD FILE " + file).toArray(new String[]{})); + this.hiveJdbcConnector.executeStatements(Lists.transform(this.addJars, file -> "ADD JAR " + file).toArray(new String[]{})); + this.hiveJdbcConnector.executeStatements(this.setupQueries.toArray(new String[]{})); this.hiveJdbcConnector.executeStatements(queries.toArray(new String[queries.size()])); super.run(); } catch (Exception e) {
