This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 5931b95f7 OOZIE-3669 Fix purge process for bundles to prevent orphan 
coordinators (jmakai via dionusos)
5931b95f7 is described below

commit 5931b95f777b209c463dc74de88d245ab645263d
Author: Denes Bodo <[email protected]>
AuthorDate: Sat Nov 12 10:28:04 2022 +0100

    OOZIE-3669 Fix purge process for bundles to prevent orphan coordinators 
(jmakai via dionusos)
---
 .../org/apache/oozie/command/PurgeXCommand.java    |  81 +++++++++++++---
 .../apache/oozie/command/TestPurgeXCommand.java    | 108 +++++++++++++++++++++
 release-log.txt                                    |   1 +
 3 files changed, 174 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index 0e9bebbe2..833c7c17a 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -366,6 +366,56 @@ public class PurgeXCommand extends XCommand<Void> {
         purgeCoordinators(coordsToPurge);
     }
 
+    /**
+     * Process coordinators in a Bundle to purge them and their children.
+     * Returns <tt>true</tt> if all of the coordinators (and their children) 
were purged.
+     *
+     * @param coords List of coordinators to process
+     * @throws JPAExecutorException If a JPA executor has a problem
+     * @return <tt>true</tt> if all of the coordinators (and their children) 
were purged.
+     */
+    private boolean processCoordinatorsInBundle(List<String> coords) throws 
JPAExecutorException {
+        List<String> wfActionsInBundle = new ArrayList<>();
+        List<WorkflowJobBean> wfjBeanListInBundle = new ArrayList<>();
+        List<String> wfsToPurgeInBundle;
+        for (String coord : coords) {
+            // Get all of the direct workflow children in this Coordinator
+            List<WorkflowJobBean> wfjBeanListInCoord = new ArrayList<>();
+            int size;
+            do {
+                size = wfjBeanListInCoord.size();
+                wfjBeanListInCoord.addAll(jpaService.execute(
+                        new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coord, 
wfjBeanListInCoord.size(), limit)));
+            } while (size != wfjBeanListInCoord.size());
+            wfjBeanListInBundle.addAll(wfjBeanListInCoord);
+
+            // Get all of the action children in this Coordinator
+            List<String> wfActionsInCoord = new ArrayList<>();
+            do {
+                size = wfActionsInCoord.size();
+                wfActionsInCoord.addAll(jpaService.execute(
+                        new CoordActionsGetFromCoordJobIdJPAExecutor(coord, 
wfActionsInCoord.size(), limit)));
+            } while (size != wfActionsInCoord.size());
+            wfActionsInBundle.addAll(wfActionsInCoord);
+        }
+
+        // Get all of the purgable workflows in this Bundle
+        wfsToPurgeInBundle = fetchTerminatedWorkflow(wfjBeanListInBundle);
+
+        // Only purging the Bundle and all of its elements if all of the 
workflows are purgable
+        if (wfjBeanListInBundle.size() == wfsToPurgeInBundle.size()) {
+            // Process the children workflow
+            processWorkflows(wfsToPurgeInBundle);
+            // Process the children action
+            purgeCoordActions(wfActionsInBundle);
+            // Now that all children have been purged, we can purge the 
coordinators
+            purgeCoordinators(coords);
+
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Process bundles to purge them and their children
      *
@@ -373,28 +423,27 @@ public class PurgeXCommand extends XCommand<Void> {
      * @throws JPAExecutorException If a JPA executor has a problem
      */
     private void processBundles(List<String> bundles) throws 
JPAExecutorException {
-        List<String> coordsToPurge = new ArrayList<String>();
-        List<String> bundlesToPurge = new ArrayList<String>();
-        for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
-            String bundleId = it.next();
-            // We only purge the bundle and its children if they are all ready 
to be purged
+        List<String> bundlesToPurge = new ArrayList<>();
+        for (String bundle : bundles) {
             long numChildrenNotReady = jpaService.execute(
-                    new 
CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
+                    new 
CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundle));
             if (numChildrenNotReady == 0) {
-                bundlesToPurge.add(bundleId);
-                LOG.debug("Purging bundle " + bundleId);
-                // Get all of the direct children for this bundle
-                List<String> children = new ArrayList<String>();
+                // Get all of the coordinators in this bundle
+                List<String> coordsToPurge = new ArrayList<>();
                 int size;
                 do {
-                    size = children.size();
-                    children.addAll(jpaService.execute(new 
CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
-                } while (size != children.size());
-                coordsToPurge.addAll(children);
+                    size = coordsToPurge.size();
+                    coordsToPurge.addAll(jpaService.execute(new 
CoordJobsGetFromParentIdJPAExecutor(bundle,
+                            coordsToPurge.size(), limit)));
+                } while (size != coordsToPurge.size());
+
+                boolean isAllCoordsPurged = 
processCoordinatorsInBundle(coordsToPurge);
+                if (isAllCoordsPurged) {
+                    bundlesToPurge.add(bundle);
+                    LOG.debug("Purging bundle " + bundle);
+                }
             }
         }
-        // Process the children
-        processCoordinators(coordsToPurge);
         // Now that all children have been purged, we can purge the bundles
         purgeBundles(bundlesToPurge);
     }
diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java 
b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
index 0f0b8590e..209907ba0 100644
--- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
@@ -319,6 +319,114 @@ public class TestPurgeXCommand extends XDataTestCase {
         assertBundleActionNotPurged(bundleActionBean2);
     }
 
+    /**
+     * Test : purge bundle job while one of children workflows is not yet 
purgable
+     * due to its end time is in the future while exceeding the limit (100) of 
coordinators and workflows.
+     * ([102] workflows, [102] coordinatorActions, [101] coordinators, [1] 
bundles)
+     *
+     * [Scenario]:
+     *
+     * Bundle[bundleJob] --- Coordinator[coordJob(1..100)] --- 
Workflow[wfJob(1..100)] <= Ready to be purged
+     *                  |
+     *                   \
+     *                    `- Coordinator[coordJob(101)] --- 
Workflow[wfJob(101)] <= NOT ready to be purged yet
+     *                                             |
+     *                                              \
+     *                                               `- Workflow[wfJob(102)] 
<= Ready to be purged
+     *
+     *
+     * [Expectation]: wfJob(101) is not ready to be purged yet, therefore the 
bundleJob and the
+     * coordJobs and of their wfJobs should not be purged.
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testBundleIsPurgableButChildrenWfIsNotExceedingLimit() throws 
Exception {
+        BundleJobBean bundleJob = 
addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
+                "2011-01-01T01:00Z"));
+
+        for(int i = 1 ; i < 101 ; i++) {
+            CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+            WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+            WorkflowActionBean wfAction = 
addRecordToWfActionTable(wfJob.getId(), String.valueOf(i), 
WorkflowAction.Status.OK);
+            CoordinatorActionBean coordAction = 
addRecordToCoordActionTable(coordJob.getId(), i, 
CoordinatorAction.Status.SUCCEEDED,
+                    "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+            BundleActionBean bundleActionBean1 = 
addRecordToBundleActionTable(bundleJob.getId(), coordJob.getId(),
+                    "action"+i, 0, Job.Status.RUNNING);
+        }
+
+        CoordinatorJobBean coordJob101 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob101 = 
addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED,
+                WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction101 = 
addRecordToWfActionTable(wfJob101.getId(), "2", WorkflowAction.Status.OK);
+        WorkflowJobBean wfJob102 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction102 = 
addRecordToWfActionTable(wfJob101.getId(), "3", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction101 = 
addRecordToCoordActionTable(coordJob101.getId(), 2,
+                CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 
wfJob101.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction102 = 
addRecordToCoordActionTable(coordJob101.getId(), 3,
+                CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 
wfJob102.getId(), "SUCCEEDED", 0);
+        BundleActionBean bundleActionBean2 = 
addRecordToBundleActionTable(bundleJob.getId(), coordJob101.getId(),
+                "action101", 0, Job.Status.RUNNING);
+
+        purgeWithDefaultParameters();
+
+        assertBundleJobNotPurged(bundleJob);
+    }
+
+    /**
+     * Test : purge bundle job while one of children workflows is not yet 
purgable
+     * due to its end time is in the future.
+     *
+     * [Scenario]:
+     *
+     * Bundle[bundleJob] --- Coordinator[coordJob1] --- Workflow[wfJob1] <= 
Ready to be purged
+     *                  |
+     *                   \
+     *                    `- Coordinator[coordJob2] --- Workflow[wfJob2] <= 
NOT ready to be purged yet
+     *                                             |
+     *                                              \
+     *                                               `- Workflow[wfJob3] <= 
Ready to be purged
+     *
+     *
+     * [Expectation]: wfJob2 is not ready to be purged yet, therefore the 
bundleJob and the
+     * coordJobs and of their wfJobs should not be purged.
+     *
+     * @throws Exception if cannot insert records to the database
+     */
+    public void testBundleIsPurgableButChildrenWfIsNot() throws Exception {
+        BundleJobBean bundleJob = 
addRecordToBundleJobTable(Job.Status.SUCCEEDED, DateUtils.parseDateOozieTZ(
+                "2011-01-01T01:00Z"));
+
+        CoordinatorJobBean coordJob1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction1 = 
addRecordToWfActionTable(wfJob1.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = 
addRecordToCoordActionTable(coordJob1.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob1.getId(), "SUCCEEDED", 0);
+
+        CoordinatorJobBean coordJob2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob2 = 
addRecordToWfJobTableForNegCase(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction2 = 
addRecordToWfActionTable(wfJob2.getId(), "2", WorkflowAction.Status.OK);
+        WorkflowJobBean wfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction3 = 
addRecordToWfActionTable(wfJob3.getId(), "3", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction2 = 
addRecordToCoordActionTable(coordJob2.getId(), 2, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob2.getId(), "SUCCEEDED", 0);
+        CoordinatorActionBean coordAction3 = 
addRecordToCoordActionTable(coordJob2.getId(), 3, 
CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob3.getId(), "SUCCEEDED", 0);
+
+        BundleActionBean bundleActionBean1 = 
addRecordToBundleActionTable(bundleJob.getId(), coordJob1.getId(),
+                "action1", 0, Job.Status.RUNNING);
+        BundleActionBean bundleActionBean2 = 
addRecordToBundleActionTable(bundleJob.getId(), coordJob2.getId(),
+                "action2", 0, Job.Status.RUNNING);
+
+        purgeWithDefaultParameters();
+
+        assertBundleJobNotPurged(bundleJob);
+        assertCoordinatorJobNotPurged(coordJob1);
+        assertCoordinatorJobNotPurged(coordJob2);
+        assertWorkflowJobNotPurged(wfJob1);
+        assertWorkflowJobNotPurged(wfJob2);
+        assertWorkflowJobNotPurged(wfJob3);
+    }
+
     /**
      * Test : The workflow should get purged, but the coordinator parent 
shouldn't get purged --> neither will get purged
      *
diff --git a/release-log.txt b/release-log.txt
index 56ecdb9cb..f32679613 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3669 Fix purge process for bundles to prevent orphan coordinators 
(jmakai via dionusos)
 OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible 
OutOfMemoryError when there are too many coordinator actions to materialize 
(jmakai via dionusos)
 OOZIE-3666 Oozie log streaming bug when log timestamps are the same on 
multiple Oozie servers (jmakai via dionusos)
 OOZIE-3661 Oozie cannot handle environment variables with key=value content 
(dionusos via asalamon74)

Reply via email to