This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 5931b95f7 OOZIE-3669 Fix purge process for bundles to prevent orphan
coordinators (jmakai via dionusos)
5931b95f7 is described below
commit 5931b95f777b209c463dc74de88d245ab645263d
Author: Denes Bodo <[email protected]>
AuthorDate: Sat Nov 12 10:28:04 2022 +0100
OOZIE-3669 Fix purge process for bundles to prevent orphan coordinators
(jmakai via dionusos)
---
.../org/apache/oozie/command/PurgeXCommand.java | 81 +++++++++++++---
.../apache/oozie/command/TestPurgeXCommand.java | 108 +++++++++++++++++++++
release-log.txt | 1 +
3 files changed, 174 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index 0e9bebbe2..833c7c17a 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -366,6 +366,56 @@ public class PurgeXCommand extends XCommand<Void> {
purgeCoordinators(coordsToPurge);
}
+ /**
+ * Process coordinators in a Bundle to purge them and their children.
+ * Returns <tt>true</tt> if all of the coordinators (and their children)
were purged.
+ *
+ * @param coords List of coordinators to process
+ * @throws JPAExecutorException If a JPA executor has a problem
+ * @return <tt>true</tt> if all of the coordinators (and their children)
were purged.
+ */
+ private boolean processCoordinatorsInBundle(List<String> coords) throws
JPAExecutorException {
+ List<String> wfActionsInBundle = new ArrayList<>();
+ List<WorkflowJobBean> wfjBeanListInBundle = new ArrayList<>();
+ List<String> wfsToPurgeInBundle;
+ for (String coord : coords) {
+ // Get all of the direct workflow children in this Coordinator
+ List<WorkflowJobBean> wfjBeanListInCoord = new ArrayList<>();
+ int size;
+ do {
+ size = wfjBeanListInCoord.size();
+ wfjBeanListInCoord.addAll(jpaService.execute(
+ new
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coord,
wfjBeanListInCoord.size(), limit)));
+ } while (size != wfjBeanListInCoord.size());
+ wfjBeanListInBundle.addAll(wfjBeanListInCoord);
+
+ // Get all of the action children in this Coordinator
+ List<String> wfActionsInCoord = new ArrayList<>();
+ do {
+ size = wfActionsInCoord.size();
+ wfActionsInCoord.addAll(jpaService.execute(
+ new CoordActionsGetFromCoordJobIdJPAExecutor(coord,
wfActionsInCoord.size(), limit)));
+ } while (size != wfActionsInCoord.size());
+ wfActionsInBundle.addAll(wfActionsInCoord);
+ }
+
+ // Get all of the purgable workflows in this Bundle
+ wfsToPurgeInBundle = fetchTerminatedWorkflow(wfjBeanListInBundle);
+
+ // Only purging the Bundle and all of its elements if all of the
workflows are purgable
+ if (wfjBeanListInBundle.size() == wfsToPurgeInBundle.size()) {
+ // Process the children workflow
+ processWorkflows(wfsToPurgeInBundle);
+ // Process the children action
+ purgeCoordActions(wfActionsInBundle);
+ // Now that all children have been purged, we can purge the
coordinators
+ purgeCoordinators(coords);
+
+ return true;
+ }
+ return false;
+ }
+
/**
* Process bundles to purge them and their children
*
@@ -373,28 +423,27 @@ public class PurgeXCommand extends XCommand<Void> {
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void processBundles(List<String> bundles) throws
JPAExecutorException {
- List<String> coordsToPurge = new ArrayList<String>();
- List<String> bundlesToPurge = new ArrayList<String>();
- for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
- String bundleId = it.next();
- // We only purge the bundle and its children if they are all ready
to be purged
+ List<String> bundlesToPurge = new ArrayList<>();
+ for (String bundle : bundles) {
long numChildrenNotReady = jpaService.execute(
- new
CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
+ new
CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundle));
if (numChildrenNotReady == 0) {
- bundlesToPurge.add(bundleId);
- LOG.debug("Purging bundle " + bundleId);
- // Get all of the direct children for this bundle
- List<String> children = new ArrayList<String>();
+ // Get all of the coordinators in this bundle
+ List<String> coordsToPurge = new ArrayList<>();
int size;
do {
- size = children.size();
- children.addAll(jpaService.execute(new
CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
- } while (size != children.size());
- coordsToPurge.addAll(children);
+ size = coordsToPurge.size();
+ coordsToPurge.addAll(jpaService.execute(new
CoordJobsGetFromParentIdJPAExecutor(bundle,
+ coordsToPurge.size(), limit)));
+ } while (size != coordsToPurge.size());
+
+ boolean isAllCoordsPurged =
processCoordinatorsInBundle(coordsToPurge);
+ if (isAllCoordsPurged) {
+ bundlesToPurge.add(bundle);
+ LOG.debug("Purging bundle " + bundle);
+ }
}
}
- // Process the children
- processCoordinators(coordsToPurge);
// Now that all children have been purged, we can purge the bundles
purgeBundles(bundlesToPurge);
}
diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
index 0f0b8590e..209907ba0 100644
--- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
@@ -319,6 +319,114 @@ public class TestPurgeXCommand extends XDataTestCase {
assertBundleActionNotPurged(bundleActionBean2);
}
+ /**
+ * Test : purge bundle job while one of children workflows is not yet
purgable
+ * due to its end time is in the future while exceeding the limit (100) of
coordinators and workflows.
+ * ([102] workflows, [102] coordinatorActions, [101] coordinators, [1]
bundles)
+ *
+ * [Scenario]:
+ *
+ * Bundle[bundleJob] --- Coordinator[coordJob(1..100)] ---
Workflow[wfJob(1..100)] <= Ready to be purged
+ * |
+ * \
+ * `- Coordinator[coordJob(101)] ---
Workflow[wfJob(101)] <= NOT ready to be purged yet
+ * |
+ * \
+ * `- Workflow[wfJob(102)]
<= Ready to be purged
+ *
+ *
+ * [Expectation]: wfJob(101) is not ready to be purged yet, therefore the
bundleJob and the
+ * coordJobs and of their wfJobs should not be purged.
+ *
+ * @throws Exception if cannot insert records to the database
+ */
+ public void testBundleIsPurgableButChildrenWfIsNotExceedingLimit() throws
Exception {
+ BundleJobBean bundleJob =
addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
+ "2011-01-01T01:00Z"));
+
+ for(int i = 1 ; i < 101 ; i++) {
+ CoordinatorJobBean coordJob =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJob =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction =
addRecordToWfActionTable(wfJob.getId(), String.valueOf(i),
WorkflowAction.Status.OK);
+ CoordinatorActionBean coordAction =
addRecordToCoordActionTable(coordJob.getId(), i,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+ BundleActionBean bundleActionBean1 =
addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(),
+ "action"+i, 0, Job.Status.RUNNING);
+ }
+
+ CoordinatorJobBean coordJob101 =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJob101 =
addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED,
+ WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction101 =
addRecordToWfActionTable(wfJob101.getId(), "2", WorkflowAction.Status.OK);
+ WorkflowJobBean wfJob102 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction102 =
addRecordToWfActionTable(wfJob101.getId(), "3", WorkflowAction.Status.OK);
+ CoordinatorActionBean coordAction101 =
addRecordToCoordActionTable(coordJob101.getId(), 2,
+ CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml",
wfJob101.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction102 =
addRecordToCoordActionTable(coordJob101.getId(), 3,
+ CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml",
wfJob102.getId(), "SUCCEEDED", 0);
+ BundleActionBean bundleActionBean2 =
addRecordToBundleActionTable(bundleJob.getId(), coordJob101.getId(),
+ "action101", 0, Job.Status.RUNNING);
+
+ purgeWithDefaultParameters();
+
+ assertBundleJobNotPurged(bundleJob);
+ }
+
+ /**
+ * Test : purge bundle job while one of children workflows is not yet
purgable
+ * due to its end time is in the future.
+ *
+ * [Scenario]:
+ *
+ * Bundle[bundleJob] --- Coordinator[coordJob1] --- Workflow[wfJob1] <=
Ready to be purged
+ * |
+ * \
+ * `- Coordinator[coordJob2] --- Workflow[wfJob2] <=
NOT ready to be purged yet
+ * |
+ * \
+ * `- Workflow[wfJob3] <=
Ready to be purged
+ *
+ *
+ * [Expectation]: wfJob2 is not ready to be purged yet, therefore the
bundleJob and the
+ * coordJobs and of their wfJobs should not be purged.
+ *
+ * @throws Exception if cannot insert records to the database
+ */
+ public void testBundleIsPurgableButChildrenWfIsNot() throws Exception {
+ BundleJobBean bundleJob =
addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
+ "2011-01-01T01:00Z"));
+
+ CoordinatorJobBean coordJob1 =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJob1 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction1 =
addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
+ CoordinatorActionBean coordAction =
addRecordToCoordActionTable(coordJob1.getId(), 1,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
+
+ CoordinatorJobBean coordJob2 =
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ WorkflowJobBean wfJob2 =
addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction2 =
addRecordToWfActionTable(wfJob2.getId(), "2", WorkflowAction.Status.OK);
+ WorkflowJobBean wfJob3 =
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED,
WorkflowInstance.Status.SUCCEEDED);
+ WorkflowActionBean wfAction3 =
addRecordToWfActionTable(wfJob3.getId(), "3", WorkflowAction.Status.OK);
+ CoordinatorActionBean coordAction2 =
addRecordToCoordActionTable(coordJob2.getId(), 2,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
+ CoordinatorActionBean coordAction3 =
addRecordToCoordActionTable(coordJob2.getId(), 3,
CoordinatorAction.Status.SUCCEEDED,
+ "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
+
+ BundleActionBean bundleActionBean1 =
addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(),
+ "action1", 0, Job.Status.RUNNING);
+ BundleActionBean bundleActionBean2 =
addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(),
+ "action2", 0, Job.Status.RUNNING);
+
+ purgeWithDefaultParameters();
+
+ assertBundleJobNotPurged(bundleJob);
+ assertCoordinatorJobNotPurged(coordJob1);
+ assertCoordinatorJobNotPurged(coordJob2);
+ assertWorkflowJobNotPurged(wfJob1);
+ assertWorkflowJobNotPurged(wfJob2);
+ assertWorkflowJobNotPurged(wfJob3);
+ }
+
/**
* Test : The workflow should get purged, but the coordinator parent
shouldn't get purged --> neither will get purged
*
diff --git a/release-log.txt b/release-log.txt
index 56ecdb9cb..f32679613 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3669 Fix purge process for bundles to prevent orphan coordinators
(jmakai via dionusos)
OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible
OutOfMemoryError when there are too many coordinator actions to materialize
(jmakai via dionusos)
OOZIE-3666 Oozie log streaming bug when log timestamps are the same on
multiple Oozie servers (jmakai via dionusos)
OOZIE-3661 Oozie cannot handle environment variables with key=value content
(dionusos via asalamon74)