Repository: oozie Updated Branches: refs/heads/master 872db60c8 -> b3b75189e
OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b3b75189 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b3b75189 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b3b75189 Branch: refs/heads/master Commit: b3b75189e0c1d336d2b3ba7c41a0efea4b8b67de Parents: 872db60 Author: Robert Kanter <[email protected]> Authored: Tue Feb 25 16:05:00 2014 -0800 Committer: Robert Kanter <[email protected]> Committed: Tue Feb 25 16:05:00 2014 -0800 ---------------------------------------------------------------------- .../command/coord/CoordSubmitXCommand.java | 17 ++- core/src/main/resources/oozie-default.xml | 10 ++ .../oozie/TestCoordinatorEngineStreamLog.java | 109 ++++++++++--------- .../command/coord/TestCoordSubmitXCommand.java | 52 +++++++++ release-log.txt | 1 + 5 files changed, 134 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java index 712fe51..9e6a3d5 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java @@ -131,9 +131,10 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { */ public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; - public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; + public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency"; + private ELEvaluator evalFreq = null; private ELEvaluator evalNofuncs = null; private ELEvaluator evalData = null; @@ -314,9 +315,19 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time."); } - // Check if a coord job with cron frequency will materialize actions try { - Integer.parseInt(coordJob.getFrequency()); + // Check if a coord job with cron frequency will materialize actions + int freq = Integer.parseInt(coordJob.getFrequency()); + + // Check if the frequency is faster than 5 min if enabled + if (Services.get().getConf().getBoolean(CONF_CHECK_MAX_FREQUENCY, true)) { + CoordinatorJob.Timeunit unit = coordJob.getTimeUnit(); + if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) { + throw new IllegalArgumentException("Coordinator job with frequency [" + freq + + "] minutes is faster than allowed maximum of 5 minutes (" + + CONF_CHECK_MAX_FREQUENCY + " is set to true)"); + } + } } catch (NumberFormatException e) { Date start = coordJob.getStartTime(); Calendar cal = Calendar.getInstance(); http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 0fd1796..34362aa 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -497,6 +497,16 @@ this factor X the total queue size.</description> </property> + <property> + <name>oozie.service.coord.check.maximum.frequency</name> + <value>true</value> + <description> + When true, Oozie will reject any coordinators with a frequency faster than 5 minutes. It is not recommended to disable + this check or submit coordinators with frequencies faster than 5 minutes: doing so can cause unintended behavior and + additional system stress. + </description> + </property> + <!-- ELService --> <!-- List of supported groups for ELService --> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java index 5a1cb9b..b4f161a 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java @@ -233,64 +233,69 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase { } private String runJobsImpl(final CoordinatorEngine ce, int count) throws Exception { - services.setService(DummyXLogStreamingService.class); - // need to re-define the parameters that are cleared upon the service - // reset: - new DagXLogInfoService().init(services); - - Configuration conf = new XConfiguration(); - - final String appPath = getTestCaseFileUri("coordinator.xml"); - final long now = System.currentTimeMillis(); - final String start = DateUtils.formatDateOozieTZ(new Date(now)); - long e = now + 1000 * 60 * count; - final String end = DateUtils.formatDateOozieTZ(new Date(e)); - - String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1); - writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml"); - - String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start - + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> " - + " <timeout>1</timeout> " + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> " - + "</controls> " + "<action> " + " <workflow> " + " <app-path>" + getFsTestCaseDir() - + "/workflow.xml</app-path>" - + " <configuration> <property> <name>inputA</name> <value>valueA</value> </property> " - + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> " - + "</workflow>" + "</action> " + "</coordinator-app>"; - writeToFile(appXml, appPath); - conf.set(OozieClient.COORDINATOR_APP_PATH, appPath); - conf.set(OozieClient.USER_NAME, getTestUser()); - - final String jobId = ce.submitJob(conf, true); - waitFor(1000 * 60 * count, new Predicate() { - @Override - public boolean evaluate() throws Exception { - try { - List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions(); - if (actions.size() < 1) { - return false; - } - for (CoordinatorAction action : actions) { - CoordinatorAction.Status actionStatus = action.getStatus(); - if (actionStatus != CoordinatorAction.Status.SUCCEEDED) { + try { + Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false); + services.setService(DummyXLogStreamingService.class); + // need to re-define the parameters that are cleared upon the service + // reset: + new DagXLogInfoService().init(services); + + Configuration conf = new XConfiguration(); + + final String appPath = getTestCaseFileUri("coordinator.xml"); + final long now = System.currentTimeMillis(); + final String start = DateUtils.formatDateOozieTZ(new Date(now)); + long e = now + 1000 * 60 * count; + final String end = DateUtils.formatDateOozieTZ(new Date(e)); + + String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1); + writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml"); + + String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start + + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> " + + " <timeout>1</timeout> " + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> " + + "</controls> " + "<action> " + " <workflow> " + " <app-path>" + getFsTestCaseDir() + + "/workflow.xml</app-path>" + + " <configuration> <property> <name>inputA</name> <value>valueA</value> </property> " + + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> " + + "</workflow>" + "</action> " + "</coordinator-app>"; + writeToFile(appXml, appPath); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPath); + conf.set(OozieClient.USER_NAME, getTestUser()); + + final String jobId = ce.submitJob(conf, true); + waitFor(1000 * 60 * count, new Predicate() { + @Override + public boolean evaluate() throws Exception { + try { + List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions(); + if (actions.size() < 1) { return false; } + for (CoordinatorAction action : actions) { + CoordinatorAction.Status actionStatus = action.getStatus(); + if (actionStatus != CoordinatorAction.Status.SUCCEEDED) { + return false; + } + } + return true; + } + catch (Exception ex) { + ex.printStackTrace(); + return false; } - return true; - } - catch (Exception ex) { - ex.printStackTrace(); - return false; } + }); + // Assert all the actions are succeeded (useful for waitFor() timeout + // case): + final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions(); + for (CoordinatorAction action : actions) { + assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus()); } - }); - // Assert all the actions are succeeded (useful for waitFor() timeout - // case): - final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions(); - for (CoordinatorAction action : actions) { - assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus()); + return jobId; + } finally { + Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true); } - return jobId; } private void writeToFile(String content, Path appPath, String fileName) throws IOException { http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java index 88756f6..8dc0c44 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java @@ -1066,6 +1066,58 @@ public class TestCoordSubmitXCommand extends XDataTestCase { , job.getJobXml().contains(URI_TEMPLATE_INCLUDE_XML)); } + /** + * Frequency faster/slower than than maximum + * + * @throws Exception + */ + public void testCheckMaximumFrequency() throws Exception { + assertTrue(Services.get().getConf().getBoolean("oozie.service.coord.check.maximum.frequency", false)); + _testCheckMaximumFrequencyHelper("5"); + _testCheckMaximumFrequencyHelper("10"); + _testCheckMaximumFrequencyHelper("${coord:hours(2)}"); + _testCheckMaximumFrequencyHelper("${coord:days(3)}"); + _testCheckMaximumFrequencyHelper("${coord:months(4)}"); + try { + _testCheckMaximumFrequencyHelper("3"); + fail(); + } catch (CommandException ce) { + assertEquals(ErrorCode.E1003, ce.getErrorCode()); + assertTrue(ce.getMessage().contains("Coordinator job with frequency [3] minutes is faster than allowed maximum of 5 " + + "minutes")); + } + try { + Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false); + _testCheckMaximumFrequencyHelper("5"); + _testCheckMaximumFrequencyHelper("10"); + _testCheckMaximumFrequencyHelper("${coord:hours(2)}"); + _testCheckMaximumFrequencyHelper("${coord:days(3)}"); + _testCheckMaximumFrequencyHelper("${coord:months(4)}"); + _testCheckMaximumFrequencyHelper("3"); + } finally { + Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true); + } + } + + private void _testCheckMaximumFrequencyHelper(String freq) throws Exception { + Configuration conf = new XConfiguration(); + File appPathFile = new File(getTestCaseDir(), "coordinator.xml"); + String appXml = "<coordinator-app name=\"NAME\" frequency=\"" + freq + "\" start=\"2009-02-01T01:00Z\" " + + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" " + + "xmlns=\"uri:oozie:coordinator:0.2\"> " + + "<action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> " + + "<configuration> <property> <name>inputA</name> <value>blah</value> </property> " + + "</configuration> </workflow> </action> </coordinator-app>"; + writeToFile(appXml, appPathFile); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + CoordSubmitXCommand sc = new CoordSubmitXCommand(conf); + String jobId = sc.call(); + + assertEquals(jobId.substring(jobId.length() - 2), "-C"); + checkCoordJobs(jobId); + } + private void _testConfigDefaults(boolean withDefaults) throws Exception { Configuration conf = new XConfiguration(); File appPathFile = new File(getTestCaseDir(), "coordinator.xml"); http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index fa0540f..be20154 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter) OOZIE-1699 Some of the commands submitted to Oozie internal queue are never executed (sriksun via virag) OOZIE-1671 add an option to limit # of coordinator actions for log retrieval (ryota) OOZIE-1629 EL function in <timeout> is not evaluated properly (ryota)
