Author: virag
Date: Fri Mar 15 07:01:06 2013
New Revision: 1456785
URL: http://svn.apache.org/r1456785
Log:
OOZIE-1267 Dryrun option for push missing deps (virag)
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1456785&r1=1456784&r2=1456785&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Fri Mar 15 07:01:06 2013
@@ -251,7 +251,7 @@ public class CoordActionInputCheckXComma
return checkResolvedUris(eAction, existList, nonExistList, conf);
}
- private boolean checkUnResolvedInput(StringBuilder actionXml,
Configuration conf) throws Exception {
+ protected boolean checkUnResolvedInput(StringBuilder actionXml,
Configuration conf) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking
Latest/future");
boolean allExist = checkUnresolvedInstances(eAction, conf);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1456785&r1=1456784&r2=1456785&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
Fri Mar 15 07:01:06 2013
@@ -36,6 +36,7 @@ import org.apache.oozie.coord.CoordUtils
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.SyncCoordAction;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandler.DependencyType;
@@ -482,23 +483,52 @@ public class CoordCommandUtils {
return XmlUtils.prettyPrint(eAction).toString();
}
else {
- String action = XmlUtils.prettyPrint(eAction).toString();
- StringBuilder actionXml = new StringBuilder(action);
- Configuration actionConf = new XConfiguration(new
StringReader(actionBean.getRunConf()));
- if (actionBean.getPushMissingDependencies() != null) {
-
DependencyChecker.checkForAvailability(actionBean.getPushMissingDependencies(),
actionConf, true);
- }
- if (actionBean.getMissingDependencies() != null) {
- CoordActionInputCheckXCommand coordActionInput = new
CoordActionInputCheckXCommand(actionBean.getId(),
- actionBean.getJobId());
- StringBuilder existList = new StringBuilder();
- StringBuilder nonExistList = new StringBuilder();
- StringBuilder nonResolvedList = new StringBuilder();
- getResolvedList(actionBean.getMissingDependencies(),
nonExistList, nonResolvedList);
- coordActionInput.checkInput(actionXml, existList,
nonExistList, actionConf);
+ return dryRunCoord(eAction, actionBean);
+ }
+ }
+
+ /**
+ * @param eAction the actionXml related element
+ * @param actionBean the coordinator action bean
+ * @return
+ * @throws Exception
+ */
+ static String dryRunCoord(Element eAction, CoordinatorActionBean
actionBean) throws Exception {
+ String action = XmlUtils.prettyPrint(eAction).toString();
+ StringBuilder actionXml = new StringBuilder(action);
+ Configuration actionConf = new XConfiguration(new
StringReader(actionBean.getRunConf()));
+
+ boolean isPushDepAvailable = true;
+ if (actionBean.getPushMissingDependencies() != null) {
+ ActionDependency actionDep =
DependencyChecker.checkForAvailability(
+ actionBean.getPushMissingDependencies(), actionConf, true);
+ if (actionDep.getMissingDependencies().size() != 0) {
+ isPushDepAvailable = false;
}
- return actionXml.toString();
+
}
+ boolean isPullDepAvailable = true;
+ CoordActionInputCheckXCommand coordActionInput = new
CoordActionInputCheckXCommand(actionBean.getId(),
+ actionBean.getJobId());
+ if (actionBean.getMissingDependencies() != null) {
+ StringBuilder existList = new StringBuilder();
+ StringBuilder nonExistList = new StringBuilder();
+ StringBuilder nonResolvedList = new StringBuilder();
+ getResolvedList(actionBean.getMissingDependencies(), nonExistList,
nonResolvedList);
+ isPullDepAvailable = coordActionInput.checkInput(actionXml,
existList, nonExistList, actionConf);
+ }
+
+ if (isPullDepAvailable && isPushDepAvailable) {
+ // Check for latest/future
+ boolean isLatestFutureDepAvailable =
coordActionInput.checkUnResolvedInput(actionXml, actionConf);
+ if (isLatestFutureDepAvailable) {
+ String newActionXml =
CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
+ actionBean.getId());
+ actionXml.replace(0, actionXml.length(), newActionXml);
+ }
+ }
+
+ return actionXml.toString();
}
/**
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java?rev=1456785&view=auto
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
(added)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
Fri Mar 15 07:01:06 2013
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class TestCoordCommandUtils extends XDataTestCase {
+ protected Services services;
+
+ protected String getProcessingTZ() {
+ return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+ }
+
+ private String hcatServer;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ setSystemProperty(DateUtils.OOZIE_PROCESSING_TIMEZONE_KEY,
getProcessingTZ());
+ services = super.setupServicesForHCatalog();
+ services.init();
+ cleanUpDBTables();
+ hcatServer = getMetastoreAuthority();
+ }
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testDryRunPushDependencies() {
+ try {
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+ Path appPath = new Path(getFsTestCaseDir(), "coord");
+ // actionXml only to check whether coord conf got resolved or not
+ String actionXml = getCoordActionXml(appPath,
"coord-action-for-action-input-check.xml");
+ CoordinatorActionBean actionBean =
createCoordinatorActionBean(job);
+
+ String db = "default";
+ String table = "tablename";
+ String hcatDependency = getPushMissingDependencies(db, table);
+ actionBean.setPushMissingDependencies(hcatDependency);
+
+ Element eAction = createActionElement(actionXml);
+ String newactionXml = CoordCommandUtils.dryRunCoord(eAction,
actionBean);
+ eAction = XmlUtils.parseXml(newactionXml);
+
+ Element configElem = eAction.getChild("action",
eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<?> elementList = configElem.getChildren("property",
configElem.getNamespace());
+ Element e1 = (Element) elementList.get(0);
+ Element e2 = (Element) elementList.get(1);
+ // Make sure conf is not resolved as dependencies are not met
+ assertEquals("${coord:dataIn('A')}", e1.getChild("value",
e1.getNamespace()).getValue());
+ assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value",
e2.getNamespace()).getValue());
+
+ // Make the dependencies available
+ populateTable(db, table);
+ newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+ eAction = XmlUtils.parseXml(newactionXml);
+ configElem = eAction.getChild("action", eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ elementList = configElem.getChildren("property",
configElem.getNamespace());
+ e1 = (Element) elementList.get(0);
+ e2 = (Element) elementList.get(1);
+ // Check for resolved conf
+ assertEquals(
+
"file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+ e1.getChild("value", e1.getNamespace()).getValue());
+ assertEquals("file://,testDir/2009/29", e2.getChild("value",
e1.getNamespace()).getValue());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testDryRunPullDeps() {
+
+ try {
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ Path appPath = new Path(getFsTestCaseDir(), "coord");
+ // actionXml only to check whether coord conf got resolved or not
+ String actionXml = getCoordActionXml(appPath,
"coord-action-for-action-input-check.xml");
+
+ CoordinatorActionBean actionBean =
createCoordinatorActionBean(job);
+ String testDir = getTestCaseDir();
+ String missDeps = getPullMissingDependencies(testDir);
+ actionBean.setMissingDependencies(missDeps);
+
+ Element eAction = createActionElement(actionXml);
+
+ String newactionXml = CoordCommandUtils.dryRunCoord(eAction,
actionBean);
+
+ eAction = XmlUtils.parseXml(newactionXml);
+ Element configElem = eAction.getChild("action",
eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<?> elementList = configElem.getChildren("property",
configElem.getNamespace());
+ Element e1 = (Element) elementList.get(0);
+ Element e2 = (Element) elementList.get(1);
+ // Make sure conf is not resolved as dependencies are not met
+ assertEquals("${coord:dataIn('A')}", e1.getChild("value",
e1.getNamespace()).getValue());
+ assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value",
e2.getNamespace()).getValue());
+
+ // Make the dependencies available
+ createDir(testDir + "/2009/29/");
+ createDir(testDir + "/2009/22/");
+ createDir(testDir + "/2009/15/");
+ createDir(testDir + "/2009/08/");
+ sleep(1000);
+
+ newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+ eAction = XmlUtils.parseXml(newactionXml);
+ configElem = eAction.getChild("action", eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ elementList = configElem.getChildren("property",
configElem.getNamespace());
+ e1 = (Element) elementList.get(0);
+ e2 = (Element) elementList.get(1);
+ // Check for resolved conf
+ assertEquals(
+
"file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+ e1.getChild("value", e1.getNamespace()).getValue());
+ assertEquals("file://,testDir/2009/29", e2.getChild("value",
e1.getNamespace()).getValue());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testDryRunPullAndPushDeps() {
+
+ try {
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ Path appPath = new Path(getFsTestCaseDir(), "coord");
+ // actionXml only to check whether coord conf got resolved or not
+ String actionXml = getCoordActionXml(appPath,
"coord-action-for-action-input-check.xml");
+
+ CoordinatorActionBean actionBean =
createCoordinatorActionBean(job);
+ String testDir = getTestCaseDir();
+ String missDeps = getPullMissingDependencies(testDir);
+ actionBean.setMissingDependencies(missDeps);
+
+ String db = "default";
+ String table = "tablename";
+ String hcatDependency = getPushMissingDependencies(db, table);
+
+ actionBean.setPushMissingDependencies(hcatDependency);
+
+ // Make only pull dependencies available
+ createDir(getTestCaseDir() + "/2009/29/");
+ createDir(getTestCaseDir() + "/2009/22/");
+ createDir(getTestCaseDir() + "/2009/15/");
+ createDir(getTestCaseDir() + "/2009/08/");
+ sleep(1000);
+
+ Element eAction = createActionElement(actionXml);
+
+ String newactionXml = CoordCommandUtils.dryRunCoord(eAction,
actionBean);
+
+ eAction = XmlUtils.parseXml(newactionXml);
+
+ Element configElem = eAction.getChild("action",
eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<?> elementList = configElem.getChildren("property",
configElem.getNamespace());
+ Element e1 = (Element) elementList.get(0);
+ Element e2 = (Element) elementList.get(1);
+ // Make sure conf is not resolved as pull dependencies are met but
+ // push deps are not met
+ assertEquals("${coord:dataIn('A')}", e1.getChild("value",
e1.getNamespace()).getValue());
+ assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value",
e2.getNamespace()).getValue());
+
+ populateTable(db, table);
+ newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+ eAction = XmlUtils.parseXml(newactionXml);
+ configElem = eAction.getChild("action", eAction.getNamespace())
+ .getChild("workflow",
eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ elementList = configElem.getChildren("property",
configElem.getNamespace());
+ e1 = (Element) elementList.get(0);
+ e2 = (Element) elementList.get(1);
+ // Check for resolved conf
+ assertEquals(
+
"file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+ e1.getChild("value", e1.getNamespace()).getValue());
+ assertEquals("file://,testDir/2009/29", e2.getChild("value",
e1.getNamespace()).getValue());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private String getPullMissingDependencies(String testDir) {
+ String missDeps =
"file://#testDir/2009/29/_SUCCESS#file://#testDir/2009/22/_SUCCESS#file"
+ +
"://#testDir/2009/15/_SUCCESS#file://#testDir/2009/08/_SUCCESS";
+ missDeps = missDeps.replaceAll("#testDir", testDir);
+ return missDeps;
+ }
+
+ private String getPushMissingDependencies(String db, String table) throws
Exception {
+ String newHCatDependency1 = "hcat://" + hcatServer + "/" + db + "/" +
table + "/dt=20120412;country=brazil";
+ String newHCatDependency2 = "hcat://" + hcatServer + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ dropTable(db, table, true);
+ dropDatabase(db, true);
+ createDatabase(db);
+ createTable(db, table, "dt,country");
+ return newHCatDependency;
+ }
+
+ private Element createActionElement(String actionXml) throws
JDOMException, ParseException {
+ Element eAction = XmlUtils.parseXml(actionXml);
+ eAction.removeAttribute("start");
+ eAction.removeAttribute("end");
+ eAction.setAttribute("instance-number", Integer.toString(1));
+ eAction.setAttribute("action-nominal-time",
+
DateUtils.formatDateOozieTZ(DateUtils.parseDateOozieTZ("2009-09-08T01:00Z")));
+ eAction.setAttribute("action-actual-time",
DateUtils.formatDateOozieTZ(new Date()));
+ return eAction;
+ }
+
+ private CoordinatorActionBean createCoordinatorActionBean(CoordinatorJob
job) throws IOException {
+ CoordinatorActionBean actionBean = new CoordinatorActionBean();
+ String actionId =
Services.get().get(UUIDService.class).generateChildId(job.getId(), "1");
+ actionBean.setJobId(job.getId());
+ actionBean.setId(actionId);
+ Configuration jobConf = new XConfiguration(new
StringReader(job.getConf()));
+ actionBean.setRunConf(XmlUtils.prettyPrint(jobConf).toString());
+ return actionBean;
+ }
+
+ private void createDir(String dir) {
+ Process pr;
+ try {
+ pr = Runtime.getRuntime().exec("mkdir -p " + dir + "/_SUCCESS");
+ pr.waitFor();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void populateTable(String db, String table) throws Exception {
+ addPartition(db, table, "dt=20120430;country=usa");
+ addPartition(db, table, "dt=20120412;country=brazil");
+ addPartition(db, table, "dt=20120413;country=brazil");
+ }
+
+}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1456785&r1=1456784&r2=1456785&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Mar 15 07:01:06 2013
@@ -7,6 +7,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
-- Oozie 4.0.0 (unreleased)
+OOZIE-1267 Dryrun option for push missing deps (virag)
OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
OOZIE-1261 Registered push dependencies are not removed on Coord Kill command
(virag)
OOZIE-1191 add examples of coordinator with SLA tag inserted (ryota via mona)