Repository: oozie Updated Branches: refs/heads/master e16448289 -> 6f0d303c9
OOZIE-1741 Add new coord EL function to get input partitions value string (satish.mittal via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6f0d303c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6f0d303c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6f0d303c Branch: refs/heads/master Commit: 6f0d303c9086214a472cbace5362f21aec0b943b Parents: e164482 Author: Rohini Palaniswamy <[email protected]> Authored: Mon Jun 23 13:35:50 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Mon Jun 23 13:35:50 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/coord/HCatELFunctions.java | 47 +++++ core/src/main/resources/oozie-default.xml | 2 + .../apache/oozie/coord/TestHCatELFunctions.java | 57 ++++++ .../site/twiki/CoordinatorFunctionalSpec.twiki | 186 +++++++++++++++++++ release-log.txt | 1 + .../java/org/apache/oozie/util/HCatURI.java | 29 +++ 6 files changed, 322 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java index e5f0146..9a36af0 100644 --- a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java +++ b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java @@ -115,6 +115,12 @@ public class HCatELFunctions { return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'"); } + public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) { + // Checking if the dataIn/dataOut is correct? + isValidDataEvent(dataInName); + return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'"); + } + public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) { // Checking if the dataIn/dataOut is correct? isValidDataEvent(dataOutName); @@ -266,6 +272,47 @@ public class HCatELFunctions { } /** + * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level + * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI. + * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved, + * this function will echo back the original function <p/> otherwise it sends the partition. + * + * @param dataInName : DataIn name + * @param type : for action type: hive-export + */ + public static String ph3_coord_dataInPartitions(String dataInName, String type) { + ELEvaluator eval = ELEvaluator.getCurrent(); + String uri = (String) eval.getVariable(".datain." + dataInName); + Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); + if (unresolved != null && unresolved.booleanValue() == true) { + return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}"; + } + String partitionValue = null; + if (uri != null) { + if (type.equals("hive-export")) { + String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR); + if (uriList.length > 1) { + throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: " + + dataInName + " URI: " + uri); + } + try { + partitionValue = new HCatURI(uri).toPartitionValueString(type); + } + catch (URISyntaxException e) { + throw new RuntimeException("Parsing exception for HCatURI " + uri, e); + } + } else { + throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName); + } + } + else { + XLog.getLog(HCatELFunctions.class).warn("URI is null"); + return null; + } + return partitionValue; + } + + /** * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index b944d3d..1edc5c9 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -879,6 +879,7 @@ coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionFilter_echo, coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMin_echo, coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMax_echo, + coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitions_echo, coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitions_echo, coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitionValue_echo </value> @@ -1167,6 +1168,7 @@ coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionFilter, coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMin, coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMax, + coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitions, coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitions, coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitionValue </value> http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java b/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java index f63b682..bb14130 100644 --- a/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java +++ b/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java @@ -274,6 +274,38 @@ public class TestHCatELFunctions extends XHCatTestCase { } /** + * Test HCat dataInPartition EL function (phase 1) which echo back the EL + * function itself + * + * @throws Exception + */ + @Test + public void testDataInPartitionsPh1() throws Exception { + init("coord-job-submit-data"); + String expr = "${coord:dataInPartitions('ABC', 'hive-export')}"; + // +ve test + eval.setVariable("oozie.dataname.ABC", "data-in"); + assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", CoordELFunctions.evalAndWrap(eval, expr)); + // -ve test + expr = "${coord:dataInPartitions('ABCD', 'hive-export')}"; + try { + CoordELFunctions.evalAndWrap(eval, expr); + fail("should throw exception because Data-in is not defined"); + } + catch (Exception ex) { + } + // -ve test + expr = "${coord:dataInPartitions('ABCD')}"; + eval.setVariable("oozie.dataname.ABCD", "data-in"); + try { + CoordELFunctions.evalAndWrap(eval, expr); + fail("should throw exception because EL function requires 2 parameters"); + } + catch (Exception ex) { + } + } + + /** * Test HCat dataOutPartition EL function (phase 1) which echo back the EL * function itself * @@ -498,6 +530,31 @@ public class TestHCatELFunctions extends XHCatTestCase { assertTrue(res.equals("20")); } + /** + * Test dataInPartitions EL function (phase 3) which returns the complete partition value string of a single partition + * in case of hive-export type. + * + * @throws Exception + */ + @Test + public void testDataInPartitions() throws Exception { + init("coord-action-start"); + String expr = "${coord:dataInPartitions('ABC', 'hive-export')}"; + eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us"); + eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE); + String res = CoordELFunctions.evalAndWrap(eval, expr); + assertTrue(res.equals("datastamp='20120230',region='us'") || res.equals("region='us',datastamp='20120230'")); + // -ve test; execute EL function with any other type than hive-export + try { + expr = "${coord:dataInPartitions('ABC', 'invalid-type')}"; + eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us"); + eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE); + res = CoordELFunctions.evalAndWrap(eval, expr); + fail("EL function should throw exception because of invalid type"); + } catch (Exception e) { + } + } + private void init(String tag) throws Exception { init(tag, "hdfs://localhost:9000/user/" + getTestUser() + "/US/${YEAR}/${MONTH}/${DAY}"); } http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki index 961377e..ffc4752 100644 --- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki +++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki @@ -2855,6 +2855,192 @@ C = foreach B generate foo, bar; store C into 'myOutputDatabase.myOutputTable' using org.apache.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102'); </blockquote> +---++++ 6.8.8 coord:dataInPartitions(String name, String type) EL function + +The =${coord:dataInPartitions(String name, String type)}= EL function resolves to a list of partition key-value +pairs for the input-event dataset. Currently the only type supported is 'hive-export'. The 'hive-export' type +supports only one partition instance and it can be used to create the complete partition value string that can +be used in a hive query for partition export/import. + +The example below illustrates a hive export-import job triggered by a coordinator, using the EL functions for HCat database, +table, input partitions. The example replicates the hourly processed data across hive tables. + +*%GREEN% Example: %ENDCOLOR%* + +#HCatHiveExampleOne + +*Coordinator application definition:* + +<blockquote> + <coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord" + frequency="${coord:hours(1)}" start="2014-03-28T08:00Z" + end="2030-01-01T00:00Z" timezone="UTC"> + + <datasets> + <dataset name="Stats-1" frequency="${coord:hours(1)}" + initial-instance="2014-03-28T08:00Z" timezone="UTC"> + <uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} + </uri-template> + </dataset> + <dataset name="Stats-2" frequency="${coord:hours(1)}" + initial-instance="2014-03-28T08:00Z" timezone="UTC"> + <uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR} + </uri-template> + </dataset> + </datasets> + <input-events> + <data-in name="processed-logs-1" dataset="Stats-1"> + <instance>${coord:current(0)}</instance> + </data-in> + </input-events> + <output-events> + <data-out name="processed-logs-2" dataset="Stats-2"> + <instance>${coord:current(0)}</instance> + </data-out> + </output-events> + <action> + <workflow> + <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path> + <configuration> + <property> + <name>EXPORT_DB</name> + <value>${coord:databaseIn('processed-logs-1')}</value> + </property> + <property> + <name>EXPORT_TABLE</name> + <value>${coord:tableIn('processed-logs-1')}</value> + </property> + <property> + <name>IMPORT_DB</name> + <value>${coord:databaseOut('processed-logs-2')}</value> + </property> + <property> + <name>IMPORT_TABLE</name> + <value>${coord:tableOut('processed-logs-2')}</value> + </property> + <property> + <name>EXPORT_PARTITION</name> + <value>${coord:dataInPartitions('processed-logs-1', 'hive-export')}</value> + </property> + <property> + <name>EXPORT_PATH</name> + <value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH')}/data</value> + </property> + </configuration> + </workflow> + </action> +</coordinator-app> +</blockquote> + +Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them +available in the hive action of the workflow 'logsreplicator-wf'. + +Each coordinator action will use as input events the hourly instances of the 'processed-logs-1' dataset. The +=${coord:dataInPartitions(String name, String type)}= function enables the coordinator application to pass the +partition corresponding to hourly dataset instances to the workflow job triggered by the coordinator action. +The workflow passes this partition value to the hive export script that exports the hourly partition from source +database to the staging location referred as =EXPORT_PATH=. The hive import script imports the hourly partition from +=EXPORT_PATH= staging location into the target database. + +#HCatWorkflow + +*Workflow definition:* + +<blockquote> +<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf"> + <start to="table-export"/> + <action name="table-export"> + <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + </configuration> + <script>${wf:appPath()}/scripts/table-export.hql</script> + <param>sourceDatabase=${EXPORT_DB}</param> + <param>sourceTable=${EXPORT_TABLE}</param> + <param>sourcePartition=${EXPORT_PARTITION}</param> + <param>sourceStagingDir=${EXPORT_PATH}</param> + </hive:hive> + <ok to="table-import"/> + <error to="fail"/> + </action> + <action name="table-import"> + <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2"> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + </configuration> + <script>${wf:appPath()}/scripts/table-import.hql</script> + <param>targetDatabase=${IMPORT_DB}</param> + <param>targetTable=${IMPORT_TABLE}</param> + <param>targetPartition=${EXPORT_PARTITION}</param> + <param>sourceStagingDir=${EXPORT_PATH}</param> + </hive:hive> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message> + Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + </message> + </kill> + <end name="end"/> +</workflow-app> +</blockquote> + +Ensure that the following jars are in classpath, with versions corresponding to hcatalog installation: +hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, hive-metastore.jar, hive-serde.jar, + libfb303.jar. The hive-site.xml needs to be present in classpath as well. + +*Example Hive Export script:* +The following script exports a particular Hive table partition into staging location, where the partition value + is computed through =${coord:dataInPartitions(String name, String type)}= EL function. +<blockquote> +export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) to '${sourceStagingDir}'; +</blockquote> + +For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( +'processed-logs-1', 'hive-export'), the above Hive script with resolved values would look like: +<blockquote> +export table myInputDatabase1/myInputTable1 partition (year='2014',month='03',day='28',hour='08') to 'hdfs://bar:8020/staging/2014-03-28-08'; +</blockquote> + +*Example Hive Import script:* +The following script imports a particular Hive table partition from staging location, where the partition value is computed + through =${coord:dataInPartitions(String name, String type)}= EL function. +<blockquote> +use ${targetDatabase}; +alter table ${targetTable} drop if exists partition ${targetPartition}; +import table ${targetTable} partition (${targetPartition}) from '${sourceStagingDir}'; +</blockquote> + +For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions( +'processed-logs-2', 'hive-export'), the above Hive script with resolved values would look like: + +<blockquote> +use myInputDatabase2; +alter table myInputTable2 drop if exists partition (year='2014',month='03',day='28',hour='08'); +import table myInputTable2 partition (year='2014',month='03',day='28',hour='08') from 'hdfs://bar:8020/staging/2014-03-28-08'; +</blockquote> + ---+++ 6.9. Parameterization of Coordinator Application http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 01d54a4..d1b209c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1741 Add new coord EL function to get input partitions value string (satish.mittal via rohini) OOZIE-1817 Oozie timers are not biased (rkanter) OOZIE-1807 Make bundle change command synchronous (puru via rohini) OOZIE-1678 HA support for SLA (ryota) http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java ---------------------------------------------------------------------- diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java index d797f9b..4bc5048 100644 --- a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java +++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java @@ -260,6 +260,35 @@ public class HCatURI { return filter.toString(); } + /** + * Get the entire partition value string from partition map. + * In case of type hive-export, it can be used to create entire partition value string + * that can be used in Hive query for partition export/import. + * + * type hive-export + * @return partition value string + */ + public String toPartitionValueString(String type) { + StringBuilder value = new StringBuilder(); + if (type.equals("hive-export")) { + String comparator = "="; + String separator = ","; + for (Map.Entry<String, String> entry : partitions.entrySet()) { + if (value.length() > 1) { + value.append(separator); + } + value.append(entry.getKey()); + value.append(comparator); + value.append(PARTITION_VALUE_QUOTE); + value.append(entry.getValue()); + value.append(PARTITION_VALUE_QUOTE); + } + } else { + throw new RuntimeException("Unsupported type: " + type); + } + return value.toString(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder();
