Author: virag
Date: Sat Mar 2 00:18:56 2013
New Revision: 1451785
URL: http://svn.apache.org/r1451785
Log:
OOZIE-1235 CoordPushCheck doesn't evaluate the configuration section which is
propogated to workflow (virag)
Added:
oozie/branches/branch-4.0/core/src/test/resources/coord-action-for-action-push-check.xml
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/release-log.txt
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Sat Mar 2 00:18:56 2013
@@ -57,7 +57,7 @@ import org.apache.openjpa.persistence.jd
// Update query for InputCheck
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query =
"update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp
= :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies =
:missingDependencies where w.id = :id"),
// Update query for Push-based missing dependency check
- @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query =
"update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp
= :lastModifiedTime, w.pushMissingDependencies = :pushMissingDependencies where
w.id = :id"),
+ @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query =
"update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp
= :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies =
:pushMissingDependencies where w.id = :id"),
// Update query for Start
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update
CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp =
:lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending
= :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where
w.id = :id"),
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Sat Mar 2 00:18:56 2013
@@ -53,6 +53,7 @@ import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
@@ -141,6 +142,9 @@ public class CoordActionInputCheckXComma
}
String pushDeps = coordAction.getPushMissingDependencies();
if (status == true && (pushDeps == null || pushDeps.length() ==
0)) {
+ String newActionXml = resolveCoordConfiguration(actionXml,
actionConf, actionId);
+ actionXml.replace(0, actionXml.length(), newActionXml);
+ coordAction.setActionXml(actionXml.toString());
coordAction.setStatus(CoordinatorAction.Status.READY);
// pass jobID to the CoordActionReadyXCommand
queue(new CoordActionReadyXCommand(coordAction.getJobId()),
100);
@@ -178,6 +182,13 @@ public class CoordActionInputCheckXComma
}
+ static String resolveCoordConfiguration(StringBuilder actionXml,
Configuration actionConf, String actionId) throws Exception {
+ Element eAction = XmlUtils.parseXml(actionXml.toString());
+ ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction,
actionConf, actionId);
+ materializeDataProperties(eAction, actionConf, eval);
+ return XmlUtils.prettyPrint(eAction).toString();
+ }
+
private boolean isTimeout(Date currentTime) {
long waitingTime = (currentTime.getTime() -
Math.max(coordAction.getNominalTime().getTime(), coordAction
.getCreatedTime().getTime()))
@@ -236,8 +247,7 @@ public class CoordActionInputCheckXComma
LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking
Latest/future");
allExist = checkUnresolvedInstances(eAction, conf);
}
- if (allExist == true) {
- materializeDataProperties(eAction, conf);
+ if (allExist) {
actionXml.replace(0, actionXml.length(),
XmlUtils.prettyPrint(eAction).toString());
}
return allExist;
@@ -253,8 +263,7 @@ public class CoordActionInputCheckXComma
* @update modify 'Action' element with appropriate list of files.
*/
@SuppressWarnings("unchecked")
- private void materializeDataProperties(Element eAction, Configuration
conf) throws Exception {
- ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, conf,
actionId);
+ static void materializeDataProperties(Element eAction, Configuration conf,
ELEvaluator eval) throws Exception {
Element configElem = eAction.getChild("action",
eAction.getNamespace()).getChild("workflow",
eAction.getNamespace()).getChild("configuration",
eAction.getNamespace());
if (configElem != null) {
@@ -272,7 +281,7 @@ public class CoordActionInputCheckXComma
* @param eval el functions evaluator
* @throws Exception thrown if unable to resolve tag value
*/
- private void resolveTagContents(String tagName, Element elem, ELEvaluator
eval) throws Exception {
+ private static void resolveTagContents(String tagName, Element elem,
ELEvaluator eval) throws Exception {
if (elem == null) {
return;
}
@@ -283,7 +292,7 @@ public class CoordActionInputCheckXComma
tagElem.addContent(updated);
}
else {
- LOG.warn(" Value NOT FOUND " + tagName);
+ XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT
FOUND " + tagName);
}
}
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Sat Mar 2 00:18:56 2013
@@ -32,6 +32,7 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
@@ -44,9 +45,12 @@ import org.apache.oozie.service.JPAServi
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
public class CoordPushDependencyCheckXCommand extends
CoordinatorXCommand<Void> {
protected String actionId;
@@ -177,7 +181,7 @@ public class CoordPushDependencyCheckXCo
return (timeOut >= 0) && (waitingTime > timeOut);
}
- protected void onAllPushDependenciesAvailable() {
+ protected void onAllPushDependenciesAvailable() throws CommandException {
coordAction.setPushMissingDependencies("");
if (coordAction.getMissingDependencies() == null ||
coordAction.getMissingDependencies().length() == 0) {
Date nominalTime = coordAction.getNominalTime();
@@ -189,6 +193,8 @@ public class CoordPushDependencyCheckXCo
+ currentTime + ", nominal=" + nominalTime);
}
else {
+ String actionXml = resolveCoordConfiguration();
+ coordAction.setActionXml(actionXml);
coordAction.setStatus(CoordinatorAction.Status.READY);
// pass jobID to the CoordActionReadyXCommand
queue(new CoordActionReadyXCommand(coordAction.getJobId()),
100);
@@ -196,6 +202,20 @@ public class CoordPushDependencyCheckXCo
}
}
+ private String resolveCoordConfiguration() throws CommandException {
+ try {
+ Configuration actionConf = new XConfiguration(new
StringReader(coordAction.getRunConf()));
+ StringBuilder actionXml = new
StringBuilder(coordAction.getActionXml());
+ String newActionXml =
CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
+ actionId);
+ actionXml.replace(0, actionXml.length(), newActionXml);
+ return actionXml.toString();
+ }
+ catch (Exception e) {
+ throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
+ }
+ }
+
protected void updateCoordAction(CoordinatorActionBean coordAction,
boolean isChangeInDependency)
throws CommandException {
coordAction.setLastModifiedTime(new Date());
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
Sat Mar 2 00:18:56 2013
@@ -53,6 +53,7 @@ public class CoordActionUpdatePushInputC
q.setParameter("id", coordAction.getId());
q.setParameter("status", coordAction.getStatus().toString());
q.setParameter("lastModifiedTime", new Date());
+ q.setParameter("actionXml", coordAction.getActionXml());
q.setParameter("pushMissingDependencies",
coordAction.getPushMissingDependencies());
q.executeUpdate();
// Since the return type is Void, we have to return null
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
Sat Mar 2 00:18:56 2013
@@ -49,6 +49,8 @@ import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
public class TestCoordActionInputCheckXCommand extends XDataTestCase {
protected Services services;
@@ -444,6 +446,43 @@ public class TestCoordActionInputCheckXC
assertEquals(testedValue, effectiveValue);
}
+ public void testResolveCoordConfiguration() {
+ try {
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ CoordinatorActionBean action =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml");
+
+ createDir(getTestCaseDir() + "/2009/01/29/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/15/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+ sleep(3000);
+ new CoordActionInputCheckXCommand(action.getId(),
job.getId()).call();
+ sleep(3000);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ CoordActionGetJPAExecutor coordGetCmd = new
CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean caBean = jpaService.execute(coordGetCmd);
+
+ Element eAction = XmlUtils.parseXml(caBean.getActionXml());
+ Element configElem = eAction.getChild("action",
eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<?> elementList = configElem.getChildren("property",
configElem.getNamespace());
+ Element e1 = (Element) elementList.get(0);
+ Element e2 = (Element) elementList.get(1);
+ assertEquals(
+
"file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+ e1.getChild("value", e1.getNamespace()).getValue());
+ assertEquals("file://,testDir/2009/29", e2.getChild("value",
e1.getNamespace()).getValue());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ }
+
/**
* This test verifies that for a coordinator with no input dependencies
* action is not stuck in WAITING
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Sat Mar 2 00:18:56 2013
@@ -17,8 +17,12 @@
*/
package org.apache.oozie.command.coord;
+import java.util.List;
+
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -28,6 +32,8 @@ import org.apache.oozie.service.Partitio
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -120,6 +126,43 @@ public class TestCoordPushDependencyChec
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
+
+ @Test
+ public void testResolveCoordConfiguration() throws Exception {
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120412;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-push-check.xml", newHCatDependency);
+
+ String actionId = action1.getId();
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+
+ new CoordPushDependencyCheckXCommand(actionId).call();
+
+ CoordinatorActionBean caBean = checkCoordAction(actionId, "",
CoordinatorAction.Status.READY);
+ Element eAction = XmlUtils.parseXml(caBean.getActionXml());
+ Element configElem = eAction.getChild("action", eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<?> elementList = configElem.getChildren("property",
configElem.getNamespace());
+ Element e1 = (Element) elementList.get(0);
+ Element e2 = (Element) elementList.get(1);
+ assertEquals(
+
"hcat://dummyhcat:1000/db1/table1/ds=/2009-29,hcat://dummyhcat:1000/db1/table1/ds=/2009-29,"
+
+ "hcat://dummyhcat:1000/db1/table1/ds=/2009-29",
+ e1.getChild("value", e1.getNamespace()).getValue());
+ assertEquals("hcat://dummyhcat:1000/db1/table1/ds=/2009-29",
e2.getChild("value", e1.getNamespace()).getValue());
+
+ }
+
+
@Test
public void testUpdateCoordTableMultipleDepsV3() throws Exception {
// Test for two dependencies : one of them is already existing in the
Added:
oozie/branches/branch-4.0/core/src/test/resources/coord-action-for-action-push-check.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/resources/coord-action-for-action-push-check.xml?rev=1451785&view=auto
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/resources/coord-action-for-action-push-check.xml
(added)
+++
oozie/branches/branch-4.0/core/src/test/resources/coord-action-for-action-push-check.xml
Sat Mar 2 00:18:56 2013
@@ -0,0 +1,57 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<coordinator-app xmlns="uri:oozie:coordinator:0.2" name="NAME" frequency="1"
timezone="UTC" freq_timeunit="DAY" end_of_duration="NONE"
+instance-number="1" action-nominal-time="2009-02-01T23:59${TZ}"
action-actual-time="2010-10-01T00:00${TZ}">
+ <controls>
+ <timeout>10</timeout>
+ <concurrency>2</concurrency>
+ <execution>LIFO</execution>
+ </controls>
+ <input-events>
+ <data-in name="A" dataset="a">
+ <dataset name="a" frequency="7" initial-instance="2009-01-01T01:00${TZ}"
timezone="UTC" freq_timeunit="DAY" end_of_duration="NONE">
+
<uri-template>hcat://dummyhcat:1000/db1/table1/ds=${YEAR}-${DAY}</uri-template>
+ </dataset>
+
<uris>hcat://dummyhcat:1000/db1/table1/ds=/2009-29#hcat://dummyhcat:1000/db1/table1/ds=/2009-29#hcat://dummyhcat:1000/db1/table1/ds=/2009-29</uris>
+ </data-in>
+ </input-events>
+ <output-events>
+ <data-out name="LOCAL_A" dataset="local_a">
+ <dataset name="local_a" frequency="7"
initial-instance="2009-01-01T01:00${TZ}" timezone="UTC" freq_timeunit="DAY"
end_of_duration="NONE">
+ <uri-template>file://#testDir/${YEAR}/${DAY}</uri-template>
+ </dataset>
+ <uris>hcat://dummyhcat:1000/db1/table1/ds=/2009-29</uris>
+ <start-instance>${coord:current(-3)}</start-instance>
+ </data-out>
+ </output-events>
+ <action>
+ <workflow>
+ <app-path>hdfs:///tmp/workflows/</app-path>
+ <configuration>
+ <property>
+ <name>inputA</name>
+ <value>${coord:dataIn('A')}</value>
+ </property>
+ <property>
+ <name>inputB</name>
+ <value>${coord:dataOut('LOCAL_A')}</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
Modified: oozie/branches/branch-4.0/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1451785&r1=1451784&r2=1451785&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Sat Mar 2 00:18:56 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1235 CoordPushCheck doesn't evaluate the configuration section which is
propogated to workflow (virag)
OOZIE-1203 Oozie web-console to display Bundle job definition, configuration
and log tabs (mona)
OOZIE-1237 Bump up trunk to 4.0.0-SNAPSHOT (virag)
OOZIE-561 Integrate Oozie with HCatalog