Repository: oozie Updated Branches: refs/heads/master 70052969a -> 5abd3e6a5
OOZIE-2444 Need conditional logic in bundles Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5abd3e6a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5abd3e6a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5abd3e6a Branch: refs/heads/master Commit: 5abd3e6a598bf073b1b0aded878d746ccb482da0 Parents: 7005296 Author: Purshotam Shah <[email protected]> Authored: Tue Jan 26 10:46:45 2016 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Jan 26 10:46:45 2016 -0800 ---------------------------------------------------------------------- client/src/main/resources/oozie-bundle-0.2.xsd | 1 + .../command/bundle/BundleStartXCommand.java | 34 ++++++++ .../command/bundle/TestBundleStartXCommand.java | 28 ++++++ .../org/apache/oozie/test/XDataTestCase.java | 91 ++++++++++++++++++++ core/src/test/resources/bundle-submit-job.xml | 2 +- docs/src/site/twiki/BundleFunctionalSpec.twiki | 7 +- release-log.txt | 1 + 7 files changed, 161 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/client/src/main/resources/oozie-bundle-0.2.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/oozie-bundle-0.2.xsd b/client/src/main/resources/oozie-bundle-0.2.xsd index b4e37e5..5668a92 100644 --- a/client/src/main/resources/oozie-bundle-0.2.xsd +++ b/client/src/main/resources/oozie-bundle-0.2.xsd @@ -58,6 +58,7 @@ </xs:sequence> <xs:attribute name="name" type="xs:string" use="required"/> <xs:attribute name="critical" type="xs:string" use="optional"/> + <xs:attribute name="enabled" type="xs:string" use="optional"/> </xs:complexType> <xs:complexType name="CONFIGURATION"> <xs:sequence> http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java index e026efb..27ae4a4 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java @@ -174,6 +174,11 @@ public class BundleStartXCommand extends StartTransitionXCommand { if (map.containsKey(name.getValue())) { throw new CommandException(ErrorCode.E1304, name); } + Configuration coordConf = mergeConfig(elem); + // skip coord job if it is not enabled + if (!isEnabled(elem, coordConf)) { + continue; + } boolean isCritical = false; if (critical != null && Boolean.parseBoolean(critical.getValue())) { isCritical = true; @@ -249,6 +254,10 @@ public class BundleStartXCommand extends StartTransitionXCommand { if (OozieJobInfo.isJobInfoEnabled()) { coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName()); } + // skip coord job if it is not enabled + if (!isEnabled(coordElem, coordConf)) { + continue; + } String coordName=name.getValue(); try { coordName = ELUtils.resolveAppName(coordName, coordConf); @@ -350,4 +359,29 @@ public class BundleStartXCommand extends StartTransitionXCommand { public void updateJob() throws CommandException { updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob)); } + + /** + * Checks whether the coordinator is enabled + * + * @param coordElem + * @param coordConf + * @return true if coordinator is enabled, otherwise false. + * @throws CommandException + */ + private boolean isEnabled(Element coordElem, Configuration coordConf) throws CommandException { + Attribute enabled = coordElem.getAttribute("enabled"); + if (enabled == null) { + // default is true + return true; + } + String isEnabled = enabled.getValue(); + try { + isEnabled = ELUtils.resolveAppName(isEnabled, coordConf); + } + catch (Exception e) { + throw new CommandException(ErrorCode.E1321, e.getMessage(), e); + } + return Boolean.parseBoolean(isEnabled); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java index bcd7d24..cfb8b4e 100644 --- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java @@ -155,6 +155,34 @@ public class TestBundleStartXCommand extends XDataTestCase { } /** + * Test : Start bundle job when certain coord jobs are not enabled + * + * @throws Exception + */ + public void testBundleStart3() throws Exception { + BundleJobBean job = this.addRecordToBundleJobTableDisabledCoord(Job.Status.PREP); + + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId()); + job = jpaService.execute(bundleJobGetExecutor); + assertEquals(job.getStatus(), Job.Status.PREP); + + new BundleStartXCommand(job.getId()).call(); + + job = jpaService.execute(bundleJobGetExecutor); + assertEquals(job.getStatus(), Job.Status.RUNNING); + + sleep(2000); + + List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance() + .getList(BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, job.getId()); + + assertEquals(1, actions.size()); + assertEquals(job.getId(), actions.get(0).getBundleId()); + } + + /** * Test : Start bundle job with dryrun * * @throws Exception http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index 4c45dca..a9aa79a 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -1030,6 +1030,95 @@ public abstract class XDataTestCase extends XHCatTestCase { } /** + * Insert bundle job for testing. + * + * @param jobStatus job status + * @param pending true if pending + * @return bundle job bean + * @throws Exception + */ + protected BundleJobBean addRecordToBundleJobTableDisabledCoord(Job.Status jobStatus) throws Exception { + BundleJobBean bundle = createBundleJobCoordDisabled(jobStatus); + try { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); + jpaService.execute(bundleInsertjpa); + } + catch (JPAExecutorException ce) { + ce.printStackTrace(); + fail("Unable to insert the test bundle job record to table"); + throw ce; + } + return bundle; + } + + /** + * Creates bundle job bean with one disabled coordinator + * + * @param jobStatus job status + * @return bundle job bean + * @throws Exception + */ + protected BundleJobBean createBundleJobCoordDisabled(Job.Status jobStatus) throws Exception { + return createBundleJobCoordDisabled(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE), + jobStatus); + } + + /** + * Creates bundle job bean with one disabled coordinator + * + * @param jobID + * @param jobStatus job status + * @return bundle job bean + * @throws Exception + */ + protected BundleJobBean createBundleJobCoordDisabled(String jobID, Job.Status jobStatus) throws Exception { + Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); + Path coordPath2 = new Path(getFsTestCaseDir(), "coord2"); + writeCoordXml(coordPath1, "coord-job-bundle.xml"); + writeCoordXml(coordPath2, "coord-job-bundle.xml"); + + Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle"); + String bundleAppXml = getBundleXml("bundle-submit-job.xml"); + assertNotNull(bundleAppXml); + assertTrue(bundleAppXml.length() > 0); + + bundleAppXml = bundleAppXml.replaceAll("#app_path1", + Matcher.quoteReplacement(new Path(coordPath1.toString(), "coordinator.xml").toString())); + bundleAppXml = bundleAppXml.replaceAll("#app_path2", + Matcher.quoteReplacement(new Path(coordPath2.toString(), "coordinator.xml").toString())); + + writeToFile(bundleAppXml, bundleAppPath, "bundle.xml"); + + Configuration conf = new XConfiguration(); + conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("jobTracker", getJobTrackerUri()); + conf.set("nameNode", getNameNodeUri()); + conf.set("appName", "bundle-app-name"); + conf.set("coordName1", "coord1"); + conf.set("coordName2", "coord2"); + conf.set("isEnabled", "false"); + + BundleJobBean bundle = new BundleJobBean(); + bundle.setId(jobID); + bundle.setAppName("BUNDLE-TEST"); + bundle.setAppPath(bundleAppPath.toString()); + bundle.setConf(XmlUtils.prettyPrint(conf).toString()); + bundle.setConsoleUrl("consoleUrl"); + bundle.setCreatedTime(new Date()); + bundle.setJobXml(bundleAppXml); + bundle.setLastModifiedTime(new Date()); + bundle.setOrigJobXml(bundleAppXml); + bundle.resetPending(); + bundle.setStatus(jobStatus); + bundle.setUser(conf.get(OozieClient.USER_NAME)); + bundle.setGroup(conf.get(OozieClient.GROUP_NAME)); + return bundle; + } + + /** * Create bundle action bean and save to db * * @param jobId bundle job id @@ -1409,6 +1498,7 @@ public abstract class XDataTestCase extends XHCatTestCase { conf.set("appName", "bundle-app-name"); conf.set("coordName1", "coord1"); conf.set("coordName2", "coord2"); + conf.set("isEnabled", "true"); BundleJobBean bundle = new BundleJobBean(); bundle.setId(jobID); @@ -1478,6 +1568,7 @@ public abstract class XDataTestCase extends XHCatTestCase { conf.set("coordName1", "coord1"); conf.set("coordName2", "coord2"); conf.set("coord1.starttime","2009-02-01T00:00Z"); + conf.set("isEnabled", "true"); BundleJobBean bundle = new BundleJobBean(); bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/resources/bundle-submit-job.xml ---------------------------------------------------------------------- diff --git a/core/src/test/resources/bundle-submit-job.xml b/core/src/test/resources/bundle-submit-job.xml index 6eda402..18ebda6 100644 --- a/core/src/test/resources/bundle-submit-job.xml +++ b/core/src/test/resources/bundle-submit-job.xml @@ -40,7 +40,7 @@ </property> </configuration> </coordinator> - <coordinator name='${coordName2}' critical='false'> + <coordinator name='${coordName2}' critical='false' enabled='${isEnabled}'> <app-path>#app_path2</app-path> <configuration> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/docs/src/site/twiki/BundleFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/BundleFunctionalSpec.twiki b/docs/src/site/twiki/BundleFunctionalSpec.twiki index c9252c0..c4e1e36 100644 --- a/docs/src/site/twiki/BundleFunctionalSpec.twiki +++ b/docs/src/site/twiki/BundleFunctionalSpec.twiki @@ -94,7 +94,9 @@ A bundle definition is defined in XML by a name, controls and one or more coordi * *%BLUE% controls: %ENDCOLOR%* The control specification for the bundle. * *%BLUE% kick-off-time: %ENDCOLOR%* It defines when the bundle job should start and submit the coordinator applications. This field is optional and the default is *NOW* that means the job should start right-a-way. * *%BLUE% coordinator: %ENDCOLOR%* Coordinator application specification. There should be at least one coordinator application in any bundle. - * *%BLUE% name: %ENDCOLOR%* Name of the coordinator application. It can be used for referring this application through bundle to control such as kill, suspend, rerun. + * *%BLUE% name: %ENDCOLOR%* Name of the coordinator application. It can be used for referring this application through + bundle to control such as kill, suspend, rerun. Enabled can be used to enable or disable a coordinator. It is optional. + The default value for enabled is true. * *%BLUE% app-path: %ENDCOLOR%* Path of the coordinator application definition in hdfs. This is a mandatory element. * *%BLUE% configuration: %ENDCOLOR%* A hadoop like configuration to parameterize corresponding coordinator application. This is optional. * *%BLUE% Parameterization: %ENDCOLOR%* Configuration properties that are a valid Java identifier, [A-Za-z_][0-9A-Za-z_]*, are available as =${NAME}= variables within the bundle application definition. Configuration properties that are not a valid Java identifier, for example =job.tracker=, are available via the =${bundle:conf(String name)}= function. Valid Java identifier properties are available via this function as well. @@ -107,7 +109,7 @@ A bundle definition is defined in XML by a name, controls and one or more coordi <controls> <kick-off-time>[DATETIME]</kick-off-time> </controls> - <coordinator name=[NAME] > + <coordinator name=[NAME] enabled=[TRUE | FALSE] > <app-path>[COORD-APPLICATION-PATH]</app-path> <configuration> <property> @@ -386,6 +388,7 @@ Refer to the [[DG_CoordinatorRerun][Rerunning Coordinator Actions]] for details </xs:sequence> <xs:attribute name="name" type="bundle:IDENTIFIER" use="required"/> <xs:attribute name="critical" type="xs:string" use="optional"/> + <xs:attribute name="enabled" type="xs:string" use="optional"/> </xs:complexType> <xs:complexType name="CONFIGURATION"> <xs:sequence> http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 1525ae4..5c2ee5b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2444 Need conditional logic in bundles (satishsaley via puru) OOZIE-2394 Oozie can execute command without holding lock (puru) OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru) OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)
