This is an automated email from the ASF dual-hosted git repository.

andras pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new c15e4ed  OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking 
(asalamon74 via andras.piros)
c15e4ed is described below

commit c15e4ed2590e6b91eaa3211b123e2fa95795eec0
Author: Andras Piros <andras.pi...@cloudera.com>
AuthorDate: Wed Jan 2 11:33:18 2019 +0100

    OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking (asalamon74 
via andras.piros)
---
 core/src/main/java/org/apache/oozie/ErrorCode.java |   1 +
 .../org/apache/oozie/command/PurgeXCommand.java    | 146 +++++++++++++++------
 .../apache/oozie/command/TestPurgeXCommand.java    | 139 ++++++++++++++++++++
 .../oozie/command/TestSelectorTreeTraverser.java   | 118 +++++++++++++++++
 release-log.txt                                    |   1 +
 5 files changed, 363 insertions(+), 42 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 9cc153b..e274b9d 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -108,6 +108,7 @@ public enum ErrorCode {
     E0610(XLog.OPS, "Missing JPAService, StoreService cannot run without a 
JPAService"),
     E0611(XLog.OPS, "SQL error in operation [{0}], {1}"),
     E0612(XLog.OPS, "Could not get coordinator actions"),
+    E0613(XLog.OPS, "Workflow hierarchy error, cycle found {0}"),
 
     E0700(XLog.STD, "XML error, {0}"),
     E0701(XLog.STD, "XML schema error, {0}"),
diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index 42c3b28..b3bf30c 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.command;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
@@ -44,8 +45,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 /**
  * This class is used to purge workflows, coordinators, and bundles.  It takes 
into account the relationships between workflows and
@@ -69,6 +72,65 @@ public class PurgeXCommand extends XCommand<Void> {
     private int bundleDel;
     private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
 
+    interface JPAFunction<T, R> {
+        R apply(T t) throws JPAExecutorException;
+    }
+
+    final JPAFunction<String, List<WorkflowJobBean>> 
getSubWorkflowJobBeansFunction = new JPAFunction<String,
+            List<WorkflowJobBean>>() {
+        @Override
+        public List<WorkflowJobBean> apply(String wfId) throws 
JPAExecutorException {
+            return PurgeXCommand.this.getSubWorkflowJobBeans(wfId);
+        }
+    };
+
+    final JPAFunction<List<WorkflowJobBean>, List<String>> 
fetchTerminatedWorflowFunction = new JPAFunction<List<WorkflowJobBean>,
+            List<String>>() {
+        @Override
+        public List<String> apply(List<WorkflowJobBean> wfBeanList) throws 
JPAExecutorException {
+            return PurgeXCommand.this.fetchTerminatedWorkflow(wfBeanList);
+        }
+    };
+
+    @VisibleForTesting
+    static class SelectorTreeTraverser<T, U> {
+        final T rootNode;
+        final JPAFunction<T, List<U>> childrenFinder;
+        final JPAFunction<List<U>, List<T>> selector;
+
+        SelectorTreeTraverser(final T rootNode, final JPAFunction<T, List<U>> 
childrenFinder,
+                              final JPAFunction<List<U>, List<T>> selector) {
+            this.rootNode = rootNode;
+            this.childrenFinder = childrenFinder;
+            this.selector = selector;
+        }
+
+        List<T> findAllDescendantNodesIfSelectable() throws 
JPAExecutorException {
+            List<T> allDescendantNodes = new ArrayList<>();
+            Set<T> uniqueDescendantNodes = new HashSet<>();
+            allDescendantNodes.add(rootNode);
+            uniqueDescendantNodes.add(rootNode);
+            int nextIndexToCheck = 0;
+            while (nextIndexToCheck < allDescendantNodes.size()) {
+                T id = allDescendantNodes.get(nextIndexToCheck);
+                List<U> childrenNodes = childrenFinder.apply(id);
+                List<T> selectedChildren = selector.apply(childrenNodes);
+                if (selectedChildren.size() == childrenNodes.size()) {
+                    allDescendantNodes.addAll(selectedChildren);
+                    uniqueDescendantNodes.addAll(selectedChildren);
+                    if (allDescendantNodes.size() != 
uniqueDescendantNodes.size()) {
+                        throw new JPAExecutorException(ErrorCode.E0613, 
rootNode);
+                    }
+                }
+                else {
+                    return new ArrayList<>();
+                }
+                ++nextIndexToCheck;
+            }
+            return allDescendantNodes;
+        }
+    }
+
     public PurgeXCommand(int wfOlderThan, int coordOlderThan, int 
bundleOlderThan, int limit) {
         this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
     }
@@ -185,52 +247,45 @@ public class PurgeXCommand extends XCommand<Void> {
     }
 
     /**
-     * Process workflows to purge them and their children.  Uses the 
processWorkflowsHelper method to help via recursion to make
-     * sure that the workflow children are deleted before their parents.
+     * Process workflows to purge them and their children if all the 
descendants are purgeable. Skip the workflows that have
+     * non-purgeable descendants.
      *
      * @param wfs List of workflows to process
      * @throws JPAExecutorException If a JPA executor has a problem
      */
     private void processWorkflows(List<String> wfs) throws 
JPAExecutorException {
-        List<String> wfsToPurge = processWorkflowsHelper(wfs);
+        List<String> wfsToPurge = findPurgeableWorkflows(wfs);
         purgeWorkflows(wfsToPurge);
     }
 
     /**
-     * Used by the processWorkflows method and via recursion.
+     * Get purgeable workflow list.
      *
-     * @param wfs List of workflows to process
+     * @param workflows List of workflows to process
      * @return List of workflows to purge
      * @throws JPAExecutorException If a JPA executor has a problem
      */
-    private List<String> processWorkflowsHelper(List<String> wfs) throws 
JPAExecutorException {
-        // If the list is empty, then we've finished recursing
-        if (wfs.isEmpty()) {
-            return wfs;
+    private List<String> findPurgeableWorkflows(List<String> workflows) throws 
JPAExecutorException {
+        List<String> purgeableWorkflows = new ArrayList<>();
+        for (String workflowId : workflows) {
+            SelectorTreeTraverser<String, WorkflowJobBean> 
selectorTreeTraverser = new SelectorTreeTraverser<>(workflowId,
+                    getSubWorkflowJobBeansFunction, 
fetchTerminatedWorflowFunction);
+            
purgeableWorkflows.addAll(selectorTreeTraverser.findAllDescendantNodesIfSelectable());
         }
-        List<String> subwfs = new ArrayList<String>();
-        List<String> wfsToPurge = new ArrayList<String>();
-        for (String wfId : wfs) {
-            int size;
-            List<WorkflowJobBean> swfBeanList = new 
ArrayList<WorkflowJobBean>();
-            do {
-                size = swfBeanList.size();
-                swfBeanList.addAll(jpaService.execute(
-                        new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), 
limit)));
-            } while (size != swfBeanList.size());
-
-            // Checking if sub workflow is ready to purge
-            List<String> children = fetchTerminatedWorkflow(swfBeanList);
-
-            // if all sub workflow ready to purge add them all and add current 
workflow
-            if(children.size() == swfBeanList.size()) {
-                subwfs.addAll(children);
-                wfsToPurge.add(wfId);
-            }
-        }
-        // Recurse on the children we just found to process their children
-        wfsToPurge.addAll(processWorkflowsHelper(subwfs));
-        return wfsToPurge;
+        return purgeableWorkflows;
+    }
+
+
+
+    private List<WorkflowJobBean> getSubWorkflowJobBeans(String wfId) throws 
JPAExecutorException {
+        int size;
+        List<WorkflowJobBean> swfBeanList = new ArrayList<>();
+        do {
+            size = swfBeanList.size();
+            swfBeanList.addAll(jpaService.execute(
+                    new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), 
limit)));
+        } while (size != swfBeanList.size());
+        return swfBeanList;
     }
 
     /**
@@ -242,21 +297,28 @@ public class PurgeXCommand extends XCommand<Void> {
         List<String> children = new ArrayList<String>();
         long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * 
DAY_IN_MS);
         for (WorkflowJobBean wfjBean : wfBeanList) {
-            final Date wfEndTime = wfjBean.getEndTime();
-            final boolean isFinished = wfjBean.inTerminalState();
-            if (isFinished && wfEndTime != null && wfEndTime.getTime() < 
wfOlderThanMS) {
-                    children.add(wfjBean.getId());
-            }
-            else {
-                final Date lastModificationTime = 
wfjBean.getLastModifiedTime();
-                if (isFinished && lastModificationTime != null && 
lastModificationTime.getTime() < wfOlderThanMS) {
-                    children.add(wfjBean.getId());
-                }
+            if (isWorkflowPurgeable(wfjBean, wfOlderThanMS)) {
+                children.add(wfjBean.getId());
             }
         }
         return children;
     }
 
+    private boolean isWorkflowPurgeable(WorkflowJobBean wfjBean, long 
wfOlderThanMS) {
+        final Date wfEndTime = wfjBean.getEndTime();
+        final boolean isFinished = wfjBean.inTerminalState();
+        if (isFinished && wfEndTime != null && wfEndTime.getTime() < 
wfOlderThanMS) {
+            return true;
+        }
+        else {
+            final Date lastModificationTime = wfjBean.getLastModifiedTime();
+            if (isFinished && lastModificationTime != null && 
lastModificationTime.getTime() < wfOlderThanMS) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Process coordinators to purge them and their children.
      *
diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java 
b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
index d11fcff..107547d 100644
--- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
@@ -43,6 +43,7 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.QueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
@@ -70,6 +71,7 @@ import org.apache.oozie.workflow.lite.StartNodeDef;
 
 public class TestPurgeXCommand extends XDataTestCase {
     private Services services;
+    private JPAService jpaService;
     private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
             "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
             "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
@@ -80,6 +82,7 @@ public class TestPurgeXCommand extends XDataTestCase {
         services = new Services();
         setClassesToBeExcluded(services.getConf(), excludedServices);
         services.init();
+        jpaService = Services.get().get(JPAService.class);
     }
 
     @Override
@@ -2859,6 +2862,142 @@ public class TestPurgeXCommand extends XDataTestCase {
     }
 
     /**
+     * Test : The subsubworkflow shouldn't get purged,
+     *        the subworkflow should get purged,
+     *        the workflow parent should get purged --> neither will get purged
+     *
+     * @throws Exception if unable to create workflow job or action bean
+     */
+    public void testPurgeWFWithPurgeableSubWFNonPurgeableSubSubWF() throws 
Exception {
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), 
"1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = 
addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowJobBean subsubwfJob = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING,
+                subwfJob.getId());
+        WorkflowActionBean subsubwfAction = 
addRecordToWfActionTable(subsubwfJob.getId(), "1", 
WorkflowAction.Status.RUNNING);
+
+        final int wfOlderThanDays = 7;
+        final int coordOlderThanDays = 1;
+        final int bundleOlderThanDays = 1;
+        final int limit = 10;
+        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, 
bundleOlderThanDays, limit).call();
+
+        assertWorkflowNotPurged(wfJob.getId());
+        assertWorkflowActionNotPurged(wfAction.getId());
+        assertWorkflowNotPurged(subwfJob.getId());
+        assertWorkflowActionNotPurged(subwfAction.getId());
+        assertWorkflowNotPurged(subsubwfJob.getId());
+        assertWorkflowActionNotPurged(subsubwfAction.getId());
+    }
+
+    /**
+     * Test : The subsubworkflow should get purged,
+     *        the subworkflow shouldn't get purged,
+     *        the workflow parent should get purged --> neither will get purged
+     *
+     * @throws Exception if unable to create workflow job or action bean
+     */
+    public void testPurgeWFWithNonPurgeableSubWFPurgeableSubSubWF() throws 
Exception {
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), 
"1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = 
addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING);
+        WorkflowJobBean subsubwfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                subwfJob.getId());
+        WorkflowActionBean subsubwfAction = 
addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.OK);
+
+        final int wfOlderThanDays = 7;
+        final int coordOlderThanDays = 1;
+        final int bundleOlderThanDays = 1;
+        final int limit = 10;
+        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, 
bundleOlderThanDays, limit).call();
+
+        assertWorkflowNotPurged(wfJob.getId());
+        assertWorkflowActionNotPurged(wfAction.getId());
+        assertWorkflowNotPurged(subwfJob.getId());
+        assertWorkflowActionNotPurged(subwfAction.getId());
+        assertWorkflowNotPurged(subsubwfJob.getId());
+        assertWorkflowActionNotPurged(subsubwfAction.getId());
+    }
+
+    /**
+     * Test : The subsubworkflows should get purged,
+     *        the subworkflow should get purged,
+     *        the workflow parent should get purged --> all will get purged
+     *
+     * @throws Exception if unable to create workflow job or action bean
+     */
+    public void testPurgeWFWithPurgeableSubWFPurgeableSubSubWF() throws 
Exception {
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), 
"1", WorkflowAction.Status.OK);
+        WorkflowJobBean subwfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowActionBean subwfAction = 
addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowJobBean subsub1wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                subwfJob.getId());
+        WorkflowJobBean subsub2wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                subwfJob.getId());
+        WorkflowActionBean subsub1wfAction = 
addRecordToWfActionTable(subsub1wfJob.getId(), "1", WorkflowAction.Status.OK);
+        WorkflowActionBean subsub2wfAction = 
addRecordToWfActionTable(subsub2wfJob.getId(), "1", WorkflowAction.Status.OK);
+
+        final int wfOlderThanDays = 7;
+        final int coordOlderThanDays = 1;
+        final int bundleOlderThanDays = 1;
+        final int limit = 10;
+        new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, 
bundleOlderThanDays, limit).call();
+
+        assertWorkflowPurged(wfJob.getId());
+        assertWorkflowActionPurged(wfAction.getId());
+        assertWorkflowPurged(subwfJob.getId());
+        assertWorkflowActionPurged(subwfAction.getId());
+        assertWorkflowPurged(subsub1wfJob.getId());
+        assertWorkflowPurged(subsub2wfJob.getId());
+        assertWorkflowActionPurged(subsub1wfAction.getId());
+        assertWorkflowActionPurged(subsub2wfAction.getId());
+    }
+
+    private void assertWorkflowNotPurged(String workflowId) {
+        try {
+            JPAExecutor jpaExecutor = new 
WorkflowJobGetJPAExecutor(workflowId);
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Workflow job "+workflowId+" should not have been purged");
+        }
+    }
+
+    private void assertWorkflowPurged(String workflowId) {
+        try {
+            JPAExecutor jpaExecutor = new 
WorkflowJobGetJPAExecutor(workflowId);
+            jpaService.execute(jpaExecutor);
+            fail("Workflow job "+workflowId+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+    }
+
+    private void assertWorkflowActionNotPurged(String workflowActionId) {
+        try {
+            JPAExecutor jpaExecutor = new 
WorkflowActionGetJPAExecutor(workflowActionId);
+            jpaService.execute(jpaExecutor);
+        } catch (JPAExecutorException je) {
+            fail("Workflow action "+workflowActionId+" should not have been 
purged");
+        }
+    }
+
+    private void assertWorkflowActionPurged(String workflowActionId) {
+        try {
+            JPAExecutor jpaExecutor = new 
WorkflowActionGetJPAExecutor(workflowActionId);
+            jpaService.execute(jpaExecutor);
+            fail("Workflow job "+workflowActionId+" should have been purged");
+        } catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    /**
      * Test : The subworkflow should get purged, and the workflow parent 
should get purged --> both will get purged
      * Subworkflow has terminated, last modified time is known, but end time 
is null
      *
diff --git 
a/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java 
b/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java
new file mode 100644
index 0000000..4db16df
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSelectorTreeTraverser {
+    @Rule
+    public final ExpectedException expectedException = 
ExpectedException.none();
+
+    private PurgeXCommand.JPAFunction<String, List<String>> noChildren = new 
PurgeXCommand.JPAFunction<String, List<String>>() {
+        @Override
+        public List<String> apply(String wId) {
+            return new ArrayList<>();
+        }
+    };
+    private PurgeXCommand.JPAFunction<List<String>, List<String>> noneSelector 
= new PurgeXCommand.JPAFunction<List<String>,
+            List<String>>() {
+        @Override
+        public List<String> apply(List<String> jobBeans) {
+            return new ArrayList<>();
+        }
+    };
+    private PurgeXCommand.JPAFunction<List<String>, List<String>> allSelector 
= new PurgeXCommand.JPAFunction<List<String>,
+            List<String>>() {
+        @Override
+        public List<String> apply(List<String> jobBeans) {
+            return jobBeans;
+        }
+    };
+    private PurgeXCommand.JPAFunction<String, List<String>> simpleTree = new 
PurgeXCommand.JPAFunction<String, List<String>>() {
+        @Override
+        public List<String> apply(String wId) {
+            switch (wId) {
+                case "A":
+                    return Arrays.asList("B", "C");
+                case "B:":
+                case "C":
+                    return new ArrayList<>();
+            }
+            return new ArrayList<>();
+        }
+    };
+    private PurgeXCommand.JPAFunction<String, List<String>> invalidTree = new 
PurgeXCommand.JPAFunction<String, List<String>>() {
+        @Override
+        public List<String> apply(String wId)  {
+            switch (wId) {
+                case "A":
+                    return Arrays.asList("B", "C", "D");
+                case "B:":
+                case "C":
+                    return new ArrayList<>();
+                case "D":
+                    return Collections.singletonList("A");
+            }
+            return new ArrayList<>();
+        }
+    };
+
+    @Test
+    public void testSingleWorkflow() throws JPAExecutorException {
+        PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new 
PurgeXCommand.SelectorTreeTraverser<>("A",
+                noChildren, noneSelector);
+        List<String> descendants = 
traverser.findAllDescendantNodesIfSelectable();
+        assertEquals(Collections.singletonList("A"), descendants);
+    }
+
+    @Test
+    public void testOneDepthTreeNotSelectedChildren() throws 
JPAExecutorException {
+        PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new 
PurgeXCommand.SelectorTreeTraverser<>("A",
+                simpleTree, noneSelector);
+        List<String> descendants = 
traverser.findAllDescendantNodesIfSelectable();
+        assertEquals(Collections.<String>emptyList(), descendants);
+    }
+
+    @Test
+    public void testOneDepthTreeSelectedChildren() throws JPAExecutorException 
{
+        PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new 
PurgeXCommand.SelectorTreeTraverser<>("A",
+                simpleTree, allSelector);
+        List<String> descendants = 
traverser.findAllDescendantNodesIfSelectable();
+        assertEquals(Arrays.asList("A", "B", "C"), descendants);
+    }
+
+    @Test
+    public void testInvalidTree() throws JPAExecutorException {
+        PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new 
PurgeXCommand.SelectorTreeTraverser<>("A",
+                invalidTree, allSelector);
+        expectedException.expect(JPAExecutorException.class);
+        traverser.findAllDescendantNodesIfSelectable();
+    }
+
+}
\ No newline at end of file
diff --git a/release-log.txt b/release-log.txt
index 2171c4e..6211117 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking (asalamon74 via 
andras.piros)
 OOZIE-3410 [build] Deploy fluent-job and oozie-sharelib-git artifacts 
(andras.piros via gezapeti)
 OOZIE-3397 amend Improve logging in NotificationXCommand (asalamon74 via 
andras.piros)
 OOZIE-3398 [docs] Fix oozie sub-workflow documentation (asalamon74 via 
andras.piros)

Reply via email to