Repository: oozie
Updated Branches:
  refs/heads/master e16448289 -> 6f0d303c9


OOZIE-1741 Add new coord EL function to get input partitions value string 
(satish.mittal via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6f0d303c
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6f0d303c
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6f0d303c

Branch: refs/heads/master
Commit: 6f0d303c9086214a472cbace5362f21aec0b943b
Parents: e164482
Author: Rohini Palaniswamy <[email protected]>
Authored: Mon Jun 23 13:35:50 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Mon Jun 23 13:35:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/coord/HCatELFunctions.java |  47 +++++
 core/src/main/resources/oozie-default.xml       |   2 +
 .../apache/oozie/coord/TestHCatELFunctions.java |  57 ++++++
 .../site/twiki/CoordinatorFunctionalSpec.twiki  | 186 +++++++++++++++++++
 release-log.txt                                 |   1 +
 .../java/org/apache/oozie/util/HCatURI.java     |  29 +++
 6 files changed, 322 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java 
b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
index e5f0146..9a36af0 100644
--- a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
@@ -115,6 +115,12 @@ public class HCatELFunctions {
         return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
     }
 
+    public static String ph1_coord_dataInPartitions_echo(String dataInName, 
String type) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataInName);
+        return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + 
type + "'");
+    }
+
     public static String ph1_coord_dataOutPartitionValue_echo(String 
dataOutName, String partition) {
         // Checking if the dataIn/dataOut is correct?
         isValidDataEvent(dataOutName);
@@ -266,6 +272,47 @@ public class HCatELFunctions {
     }
 
     /**
+     * Used to specify the entire HCat partition defining input for workflow 
job. <p/> Look for two evaluator-level
+     * variables <p/> A) .datain.<DATAIN_NAME> B) 
.datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI.
+     * <p/> B defines whether there are any unresolved EL-function (i.e 
latest) <p/> If there are something unresolved,
+     * this function will echo back the original function <p/> otherwise it 
sends the partition.
+     *
+     * @param dataInName : DataIn name
+     * @param type : for action type: hive-export
+     */
+    public static String ph3_coord_dataInPartitions(String dataInName, String 
type) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uri = (String) eval.getVariable(".datain." + dataInName);
+        Boolean unresolved = (Boolean) eval.getVariable(".datain." + 
dataInName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataInPartitions('" + dataInName + "', '" + type + 
"')}";
+        }
+        String partitionValue = null;
+        if (uri != null) {
+            if (type.equals("hive-export")) {
+                String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
+                if (uriList.length > 1) {
+                    throw new RuntimeException("Multiple partitions not 
supported for hive-export type. Dataset name: "
+                        + dataInName + " URI: " + uri);
+                }
+                try {
+                    partitionValue = new 
HCatURI(uri).toPartitionValueString(type);
+                }
+                catch (URISyntaxException e) {
+                    throw new RuntimeException("Parsing exception for HCatURI 
" + uri, e);
+                }
+            } else {
+                  throw new RuntimeException("Unsupported type: " + type + " 
dataset name: " + dataInName);
+            }
+        }
+        else {
+            XLog.getLog(HCatELFunctions.class).warn("URI is null");
+            return null;
+        }
+        return partitionValue;
+    }
+
+    /**
      * Used to specify the MAXIMUM value of an HCat partition which is input 
dependency for workflow job.<p/> Look for two evaluator-level
      * variables <p/> A) .datain.<DATAIN_NAME> B) 
.datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
      * HCat URIs. <p/> B defines whether there are any unresolved EL-function 
(i.e latest) <p/> If there are something

http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index b944d3d..1edc5c9 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -879,6 +879,7 @@
             
coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionFilter_echo,
             
coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMin_echo,
             
coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMax_echo,
+            
coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitions_echo,
             
coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitions_echo,
             
coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitionValue_echo
         </value>
@@ -1167,6 +1168,7 @@
             
coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionFilter,
             
coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMin,
             
coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMax,
+            
coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitions,
             
coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitions,
             
coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitionValue
         </value>

http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java 
b/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
index f63b682..bb14130 100644
--- a/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
+++ b/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
@@ -274,6 +274,38 @@ public class TestHCatELFunctions extends XHCatTestCase {
     }
 
     /**
+     * Test HCat dataInPartition EL function (phase 1) which echo back the EL
+     * function itself
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDataInPartitionsPh1() throws Exception {
+        init("coord-job-submit-data");
+        String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
+        // +ve test
+        eval.setVariable("oozie.dataname.ABC", "data-in");
+        assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", 
CoordELFunctions.evalAndWrap(eval, expr));
+        // -ve test
+        expr = "${coord:dataInPartitions('ABCD', 'hive-export')}";
+        try {
+            CoordELFunctions.evalAndWrap(eval, expr);
+            fail("should throw exception because Data-in is not defined");
+        }
+        catch (Exception ex) {
+        }
+        // -ve test
+        expr = "${coord:dataInPartitions('ABCD')}";
+        eval.setVariable("oozie.dataname.ABCD", "data-in");
+        try {
+            CoordELFunctions.evalAndWrap(eval, expr);
+            fail("should throw exception because EL function requires 2 
parameters");
+        }
+        catch (Exception ex) {
+        }
+    }
+
+    /**
      * Test HCat dataOutPartition EL function (phase 1) which echo back the EL
      * function itself
      *
@@ -498,6 +530,31 @@ public class TestHCatELFunctions extends XHCatTestCase {
         assertTrue(res.equals("20"));
     }
 
+    /**
+     * Test dataInPartitions EL function (phase 3) which returns the complete 
partition value string of a single partition
+     * in case of hive-export type.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDataInPartitions() throws Exception {
+        init("coord-action-start");
+        String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
+        eval.setVariable(".datain.ABC", 
"hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
+        eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
+        String res = CoordELFunctions.evalAndWrap(eval, expr);
+        assertTrue(res.equals("datastamp='20120230',region='us'") || 
res.equals("region='us',datastamp='20120230'"));
+        // -ve test; execute EL function with any other type than hive-export
+        try {
+            expr = "${coord:dataInPartitions('ABC', 'invalid-type')}";
+            eval.setVariable(".datain.ABC", 
"hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
+            eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
+            res = CoordELFunctions.evalAndWrap(eval, expr);
+            fail("EL function should throw exception because of invalid type");
+        } catch (Exception e) {
+        }
+    }
+
     private void init(String tag) throws Exception {
         init(tag, "hdfs://localhost:9000/user/" + getTestUser() + 
"/US/${YEAR}/${MONTH}/${DAY}");
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki 
b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index 961377e..ffc4752 100644
--- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -2855,6 +2855,192 @@ C = foreach B generate foo, bar;
 store C into 'myOutputDatabase.myOutputTable' using 
org.apache.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102');
 </blockquote>
 
+---++++ 6.8.8 coord:dataInPartitions(String name, String type) EL function
+
+The =${coord:dataInPartitions(String name, String type)}= EL function resolves 
to a list of partition key-value
+pairs for the input-event dataset. Currently the only type supported is 
'hive-export'. The 'hive-export' type
+supports only one partition instance and it can be used to create the complete 
partition value string that can
+be used in a hive query for partition export/import.
+
+The example below illustrates a hive export-import job triggered by a 
coordinator, using the EL functions for HCat database,
+table, input partitions. The example replicates the hourly processed data 
across hive tables.
+
+*%GREEN% Example: %ENDCOLOR%*
+
+#HCatHiveExampleOne
+
+*Coordinator application definition:*
+
+<blockquote>
+    <coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord"
+    frequency="${coord:hours(1)}" start="2014-03-28T08:00Z"
+    end="2030-01-01T00:00Z" timezone="UTC">
+
+    <datasets>
+        <dataset name="Stats-1" frequency="${coord:hours(1)}"
+            initial-instance="2014-03-28T08:00Z" timezone="UTC">
+            
<uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
+            </uri-template>
+        </dataset>
+        <dataset name="Stats-2" frequency="${coord:hours(1)}"
+            initial-instance="2014-03-28T08:00Z" timezone="UTC">
+            
<uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
+            </uri-template>
+        </dataset>
+    </datasets>
+    <input-events>
+        <data-in name="processed-logs-1" dataset="Stats-1">
+            <instance>${coord:current(0)}</instance>
+        </data-in>
+    </input-events>
+    <output-events>
+        <data-out name="processed-logs-2" dataset="Stats-2">
+            <instance>${coord:current(0)}</instance>
+        </data-out>
+    </output-events>
+    <action>
+      <workflow>
+        <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path>
+        <configuration>
+          <property>
+            <name>EXPORT_DB</name>
+            <value>${coord:databaseIn('processed-logs-1')}</value>
+          </property>
+          <property>
+            <name>EXPORT_TABLE</name>
+            <value>${coord:tableIn('processed-logs-1')}</value>
+          </property>
+          <property>
+            <name>IMPORT_DB</name>
+            <value>${coord:databaseOut('processed-logs-2')}</value>
+          </property>
+          <property>
+            <name>IMPORT_TABLE</name>
+            <value>${coord:tableOut('processed-logs-2')}</value>
+          </property>
+          <property>
+            <name>EXPORT_PARTITION</name>
+            <value>${coord:dataInPartitions('processed-logs-1', 
'hive-export')}</value>
+          </property>
+          <property>
+            <name>EXPORT_PATH</name>
+            
<value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 
'yyyy-MM-dd-HH')}/data</value>
+          </property>
+        </configuration>
+      </workflow>
+    </action>
+</coordinator-app>
+</blockquote>
+
+Parameterizing the input/output databases and tables using the corresponding 
EL function as shown will make them
+available in the hive action of the workflow 'logsreplicator-wf'.
+
+Each coordinator action will use as input events the hourly instances of the 
'processed-logs-1' dataset. The
+=${coord:dataInPartitions(String name, String type)}= function enables the 
coordinator application to pass the
+partition corresponding to hourly dataset instances to the workflow job 
triggered by the coordinator action.
+The workflow passes this partition value to the hive export script that 
exports the hourly partition from source
+database to the staging location referred as =EXPORT_PATH=. The hive import 
script imports the hourly partition from
+=EXPORT_PATH= staging location into the target database.
+
+#HCatWorkflow
+
+*Workflow definition:*
+
+<blockquote>
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf">
+    <start to="table-export"/>
+    <action name="table-export">
+        <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" 
xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/table-export.hql</script>
+            <param>sourceDatabase=${EXPORT_DB}</param>
+            <param>sourceTable=${EXPORT_TABLE}</param>
+            <param>sourcePartition=${EXPORT_PARTITION}</param>
+            <param>sourceStagingDir=${EXPORT_PATH}</param>
+        </hive:hive>
+        <ok to="table-import"/>
+        <error to="fail"/>
+    </action>
+    <action name="table-import">
+        <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" 
xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>${wf:appPath()}/scripts/table-import.hql</script>
+            <param>targetDatabase=${IMPORT_DB}</param>
+            <param>targetTable=${IMPORT_TABLE}</param>
+            <param>targetPartition=${EXPORT_PARTITION}</param>
+            <param>sourceStagingDir=${EXPORT_PATH}</param>
+        </hive:hive>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name="end"/>
+</workflow-app>
+</blockquote>
+
+Ensure that the following jars are in classpath, with versions corresponding 
to hcatalog installation:
+hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, 
hive-metastore.jar, hive-serde.jar,
+ libfb303.jar. The hive-site.xml needs to be present in classpath as well.
+
+*Example Hive Export script:*
+The following script exports a particular Hive table partition into staging 
location, where the partition value
+ is computed through =${coord:dataInPartitions(String name, String type)}= EL 
function.
+<blockquote>
+export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) 
to '${sourceStagingDir}';
+</blockquote>
+
+For example, for the 2014-03-28T08:00Z run with the given dataset instances 
and ${coord:dataInPartitions(
+'processed-logs-1', 'hive-export'), the above Hive script with resolved values 
would look like:
+<blockquote>
+export table myInputDatabase1/myInputTable1 partition 
(year='2014',month='03',day='28',hour='08') to 
'hdfs://bar:8020/staging/2014-03-28-08';
+</blockquote>
+
+*Example Hive Import script:*
+The following script imports a particular Hive table partition from staging 
location, where the partition value is computed
+ through =${coord:dataInPartitions(String name, String type)}= EL function.
+<blockquote>
+use ${targetDatabase};
+alter table ${targetTable} drop if exists partition ${targetPartition};
+import table ${targetTable} partition (${targetPartition}) from 
'${sourceStagingDir}';
+</blockquote>
+
+For example, for the 2014-03-28T08:00Z run with the given dataset instances 
and ${coord:dataInPartitions(
+'processed-logs-2', 'hive-export'), the above Hive script with resolved values 
would look like:
+
+<blockquote>
+use myInputDatabase2;
+alter table myInputTable2 drop if exists partition 
(year='2014',month='03',day='28',hour='08');
+import table myInputTable2 partition 
(year='2014',month='03',day='28',hour='08') from 
'hdfs://bar:8020/staging/2014-03-28-08';
+</blockquote>
+
 
 ---+++ 6.9. Parameterization of Coordinator Application
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 01d54a4..d1b209c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1741 Add new coord EL function to get input partitions value string 
(satish.mittal via rohini) 
 OOZIE-1817 Oozie timers are not biased (rkanter)
 OOZIE-1807 Make bundle change command synchronous (puru via rohini)
 OOZIE-1678 HA support for SLA (ryota)

http://git-wip-us.apache.org/repos/asf/oozie/blob/6f0d303c/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java 
b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
index d797f9b..4bc5048 100644
--- a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
+++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
@@ -260,6 +260,35 @@ public class HCatURI {
         return filter.toString();
     }
 
+    /**
+     * Get the entire partition value string from partition map.
+     * In case of type hive-export, it can be used to create entire partition 
value string
+     * that can be used in Hive query for partition export/import.
+     *
+     * type hive-export
+     * @return partition value string
+     */
+    public String toPartitionValueString(String type) {
+        StringBuilder value = new StringBuilder();
+        if (type.equals("hive-export")) {
+            String comparator = "=";
+            String separator = ",";
+            for (Map.Entry<String, String> entry : partitions.entrySet()) {
+                if (value.length() > 1) {
+                    value.append(separator);
+                }
+                value.append(entry.getKey());
+                value.append(comparator);
+                value.append(PARTITION_VALUE_QUOTE);
+                value.append(entry.getValue());
+                value.append(PARTITION_VALUE_QUOTE);
+            }
+        } else {
+            throw new RuntimeException("Unsupported type: " + type);
+        }
+        return value.toString();
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();

Reply via email to