This is an automated email from the ASF dual-hosted git repository. andras pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push: new c15e4ed OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking (asalamon74 via andras.piros) c15e4ed is described below commit c15e4ed2590e6b91eaa3211b123e2fa95795eec0 Author: Andras Piros <andras.pi...@cloudera.com> AuthorDate: Wed Jan 2 11:33:18 2019 +0100 OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking (asalamon74 via andras.piros) --- core/src/main/java/org/apache/oozie/ErrorCode.java | 1 + .../org/apache/oozie/command/PurgeXCommand.java | 146 +++++++++++++++------ .../apache/oozie/command/TestPurgeXCommand.java | 139 ++++++++++++++++++++ .../oozie/command/TestSelectorTreeTraverser.java | 118 +++++++++++++++++ release-log.txt | 1 + 5 files changed, 363 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 9cc153b..e274b9d 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -108,6 +108,7 @@ public enum ErrorCode { E0610(XLog.OPS, "Missing JPAService, StoreService cannot run without a JPAService"), E0611(XLog.OPS, "SQL error in operation [{0}], {1}"), E0612(XLog.OPS, "Could not get coordinator actions"), + E0613(XLog.OPS, "Workflow hierarchy error, cycle found {0}"), E0700(XLog.STD, "XML error, {0}"), E0701(XLog.STD, "XML schema error, {0}"), diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java index 42c3b28..b3bf30c 100644 --- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java @@ -18,6 +18,7 @@ package org.apache.oozie.command; +import com.google.common.annotations.VisibleForTesting; import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.XException; @@ -44,8 +45,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; /** * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and @@ -69,6 +72,65 @@ public class PurgeXCommand extends XCommand<Void> { private int bundleDel; private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; + interface JPAFunction<T, R> { + R apply(T t) throws JPAExecutorException; + } + + final JPAFunction<String, List<WorkflowJobBean>> getSubWorkflowJobBeansFunction = new JPAFunction<String, + List<WorkflowJobBean>>() { + @Override + public List<WorkflowJobBean> apply(String wfId) throws JPAExecutorException { + return PurgeXCommand.this.getSubWorkflowJobBeans(wfId); + } + }; + + final JPAFunction<List<WorkflowJobBean>, List<String>> fetchTerminatedWorflowFunction = new JPAFunction<List<WorkflowJobBean>, + List<String>>() { + @Override + public List<String> apply(List<WorkflowJobBean> wfBeanList) throws JPAExecutorException { + return PurgeXCommand.this.fetchTerminatedWorkflow(wfBeanList); + } + }; + + @VisibleForTesting + static class SelectorTreeTraverser<T, U> { + final T rootNode; + final JPAFunction<T, List<U>> childrenFinder; + final JPAFunction<List<U>, List<T>> selector; + + SelectorTreeTraverser(final T rootNode, final JPAFunction<T, List<U>> childrenFinder, + final JPAFunction<List<U>, List<T>> selector) { + this.rootNode = rootNode; + this.childrenFinder = childrenFinder; + this.selector = selector; + } + + List<T> findAllDescendantNodesIfSelectable() throws JPAExecutorException { + List<T> allDescendantNodes = new ArrayList<>(); + Set<T> uniqueDescendantNodes = new HashSet<>(); + allDescendantNodes.add(rootNode); + uniqueDescendantNodes.add(rootNode); + int nextIndexToCheck = 0; + while (nextIndexToCheck < allDescendantNodes.size()) { + T id = allDescendantNodes.get(nextIndexToCheck); + List<U> childrenNodes = childrenFinder.apply(id); + List<T> selectedChildren = selector.apply(childrenNodes); + if (selectedChildren.size() == childrenNodes.size()) { + allDescendantNodes.addAll(selectedChildren); + uniqueDescendantNodes.addAll(selectedChildren); + if (allDescendantNodes.size() != uniqueDescendantNodes.size()) { + throw new JPAExecutorException(ErrorCode.E0613, rootNode); + } + } + else { + return new ArrayList<>(); + } + ++nextIndexToCheck; + } + return allDescendantNodes; + } + } + public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false); } @@ -185,52 +247,45 @@ public class PurgeXCommand extends XCommand<Void> { } /** - * Process workflows to purge them and their children. Uses the processWorkflowsHelper method to help via recursion to make - * sure that the workflow children are deleted before their parents. + * Process workflows to purge them and their children if all the descendants are purgeable. Skip the workflows that have + * non-purgeable descendants. * * @param wfs List of workflows to process * @throws JPAExecutorException If a JPA executor has a problem */ private void processWorkflows(List<String> wfs) throws JPAExecutorException { - List<String> wfsToPurge = processWorkflowsHelper(wfs); + List<String> wfsToPurge = findPurgeableWorkflows(wfs); purgeWorkflows(wfsToPurge); } /** - * Used by the processWorkflows method and via recursion. + * Get purgeable workflow list. * - * @param wfs List of workflows to process + * @param workflows List of workflows to process * @return List of workflows to purge * @throws JPAExecutorException If a JPA executor has a problem */ - private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException { - // If the list is empty, then we've finished recursing - if (wfs.isEmpty()) { - return wfs; + private List<String> findPurgeableWorkflows(List<String> workflows) throws JPAExecutorException { + List<String> purgeableWorkflows = new ArrayList<>(); + for (String workflowId : workflows) { + SelectorTreeTraverser<String, WorkflowJobBean> selectorTreeTraverser = new SelectorTreeTraverser<>(workflowId, + getSubWorkflowJobBeansFunction, fetchTerminatedWorflowFunction); + purgeableWorkflows.addAll(selectorTreeTraverser.findAllDescendantNodesIfSelectable()); } - List<String> subwfs = new ArrayList<String>(); - List<String> wfsToPurge = new ArrayList<String>(); - for (String wfId : wfs) { - int size; - List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>(); - do { - size = swfBeanList.size(); - swfBeanList.addAll(jpaService.execute( - new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit))); - } while (size != swfBeanList.size()); - - // Checking if sub workflow is ready to purge - List<String> children = fetchTerminatedWorkflow(swfBeanList); - - // if all sub workflow ready to purge add them all and add current workflow - if(children.size() == swfBeanList.size()) { - subwfs.addAll(children); - wfsToPurge.add(wfId); - } - } - // Recurse on the children we just found to process their children - wfsToPurge.addAll(processWorkflowsHelper(subwfs)); - return wfsToPurge; + return purgeableWorkflows; + } + + + + private List<WorkflowJobBean> getSubWorkflowJobBeans(String wfId) throws JPAExecutorException { + int size; + List<WorkflowJobBean> swfBeanList = new ArrayList<>(); + do { + size = swfBeanList.size(); + swfBeanList.addAll(jpaService.execute( + new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit))); + } while (size != swfBeanList.size()); + return swfBeanList; } /** @@ -242,21 +297,28 @@ public class PurgeXCommand extends XCommand<Void> { List<String> children = new ArrayList<String>(); long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS); for (WorkflowJobBean wfjBean : wfBeanList) { - final Date wfEndTime = wfjBean.getEndTime(); - final boolean isFinished = wfjBean.inTerminalState(); - if (isFinished && wfEndTime != null && wfEndTime.getTime() < wfOlderThanMS) { - children.add(wfjBean.getId()); - } - else { - final Date lastModificationTime = wfjBean.getLastModifiedTime(); - if (isFinished && lastModificationTime != null && lastModificationTime.getTime() < wfOlderThanMS) { - children.add(wfjBean.getId()); - } + if (isWorkflowPurgeable(wfjBean, wfOlderThanMS)) { + children.add(wfjBean.getId()); } } return children; } + private boolean isWorkflowPurgeable(WorkflowJobBean wfjBean, long wfOlderThanMS) { + final Date wfEndTime = wfjBean.getEndTime(); + final boolean isFinished = wfjBean.inTerminalState(); + if (isFinished && wfEndTime != null && wfEndTime.getTime() < wfOlderThanMS) { + return true; + } + else { + final Date lastModificationTime = wfjBean.getLastModifiedTime(); + if (isFinished && lastModificationTime != null && lastModificationTime.getTime() < wfOlderThanMS) { + return true; + } + } + return false; + } + /** * Process coordinators to purge them and their children. * diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java index d11fcff..107547d 100644 --- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java @@ -43,6 +43,7 @@ import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.JPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.QueryExecutor; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; @@ -70,6 +71,7 @@ import org.apache.oozie.workflow.lite.StartNodeDef; public class TestPurgeXCommand extends XDataTestCase { private Services services; + private JPAService jpaService; private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; @@ -80,6 +82,7 @@ public class TestPurgeXCommand extends XDataTestCase { services = new Services(); setClassesToBeExcluded(services.getConf(), excludedServices); services.init(); + jpaService = Services.get().get(JPAService.class); } @Override @@ -2859,6 +2862,142 @@ public class TestPurgeXCommand extends XDataTestCase { } /** + * Test : The subsubworkflow shouldn't get purged, + * the subworkflow should get purged, + * the workflow parent should get purged --> neither will get purged + * + * @throws Exception if unable to create workflow job or action bean + */ + public void testPurgeWFWithPurgeableSubWFNonPurgeableSubSubWF() throws Exception { + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowJobBean subsubwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, + subwfJob.getId()); + WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.RUNNING); + + final int wfOlderThanDays = 7; + final int coordOlderThanDays = 1; + final int bundleOlderThanDays = 1; + final int limit = 10; + new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call(); + + assertWorkflowNotPurged(wfJob.getId()); + assertWorkflowActionNotPurged(wfAction.getId()); + assertWorkflowNotPurged(subwfJob.getId()); + assertWorkflowActionNotPurged(subwfAction.getId()); + assertWorkflowNotPurged(subsubwfJob.getId()); + assertWorkflowActionNotPurged(subsubwfAction.getId()); + } + + /** + * Test : The subsubworkflow should get purged, + * the subworkflow shouldn't get purged, + * the workflow parent should get purged --> neither will get purged + * + * @throws Exception if unable to create workflow job or action bean + */ + public void testPurgeWFWithNonPurgeableSubWFPurgeableSubSubWF() throws Exception { + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, + wfJob.getId()); + WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.RUNNING); + WorkflowJobBean subsubwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + subwfJob.getId()); + WorkflowActionBean subsubwfAction = addRecordToWfActionTable(subsubwfJob.getId(), "1", WorkflowAction.Status.OK); + + final int wfOlderThanDays = 7; + final int coordOlderThanDays = 1; + final int bundleOlderThanDays = 1; + final int limit = 10; + new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call(); + + assertWorkflowNotPurged(wfJob.getId()); + assertWorkflowActionNotPurged(wfAction.getId()); + assertWorkflowNotPurged(subwfJob.getId()); + assertWorkflowActionNotPurged(subwfAction.getId()); + assertWorkflowNotPurged(subsubwfJob.getId()); + assertWorkflowActionNotPurged(subsubwfAction.getId()); + } + + /** + * Test : The subsubworkflows should get purged, + * the subworkflow should get purged, + * the workflow parent should get purged --> all will get purged + * + * @throws Exception if unable to create workflow job or action bean + */ + public void testPurgeWFWithPurgeableSubWFPurgeableSubSubWF() throws Exception { + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowJobBean subwfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowActionBean subwfAction = addRecordToWfActionTable(subwfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowJobBean subsub1wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + subwfJob.getId()); + WorkflowJobBean subsub2wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + subwfJob.getId()); + WorkflowActionBean subsub1wfAction = addRecordToWfActionTable(subsub1wfJob.getId(), "1", WorkflowAction.Status.OK); + WorkflowActionBean subsub2wfAction = addRecordToWfActionTable(subsub2wfJob.getId(), "1", WorkflowAction.Status.OK); + + final int wfOlderThanDays = 7; + final int coordOlderThanDays = 1; + final int bundleOlderThanDays = 1; + final int limit = 10; + new PurgeXCommand(wfOlderThanDays, coordOlderThanDays, bundleOlderThanDays, limit).call(); + + assertWorkflowPurged(wfJob.getId()); + assertWorkflowActionPurged(wfAction.getId()); + assertWorkflowPurged(subwfJob.getId()); + assertWorkflowActionPurged(subwfAction.getId()); + assertWorkflowPurged(subsub1wfJob.getId()); + assertWorkflowPurged(subsub2wfJob.getId()); + assertWorkflowActionPurged(subsub1wfAction.getId()); + assertWorkflowActionPurged(subsub2wfAction.getId()); + } + + private void assertWorkflowNotPurged(String workflowId) { + try { + JPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowId); + jpaService.execute(jpaExecutor); + } catch (JPAExecutorException je) { + fail("Workflow job "+workflowId+" should not have been purged"); + } + } + + private void assertWorkflowPurged(String workflowId) { + try { + JPAExecutor jpaExecutor = new WorkflowJobGetJPAExecutor(workflowId); + jpaService.execute(jpaExecutor); + fail("Workflow job "+workflowId+" should have been purged"); + } catch (JPAExecutorException je) { + assertEquals(ErrorCode.E0604, je.getErrorCode()); + } + } + + private void assertWorkflowActionNotPurged(String workflowActionId) { + try { + JPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionId); + jpaService.execute(jpaExecutor); + } catch (JPAExecutorException je) { + fail("Workflow action "+workflowActionId+" should not have been purged"); + } + } + + private void assertWorkflowActionPurged(String workflowActionId) { + try { + JPAExecutor jpaExecutor = new WorkflowActionGetJPAExecutor(workflowActionId); + jpaService.execute(jpaExecutor); + fail("Workflow job "+workflowActionId+" should have been purged"); + } catch (JPAExecutorException je) { + assertEquals(ErrorCode.E0605, je.getErrorCode()); + } + } + + /** * Test : The subworkflow should get purged, and the workflow parent should get purged --> both will get purged * Subworkflow has terminated, last modified time is known, but end time is null * diff --git a/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java b/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java new file mode 100644 index 0000000..4db16df --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/TestSelectorTreeTraverser.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestSelectorTreeTraverser { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private PurgeXCommand.JPAFunction<String, List<String>> noChildren = new PurgeXCommand.JPAFunction<String, List<String>>() { + @Override + public List<String> apply(String wId) { + return new ArrayList<>(); + } + }; + private PurgeXCommand.JPAFunction<List<String>, List<String>> noneSelector = new PurgeXCommand.JPAFunction<List<String>, + List<String>>() { + @Override + public List<String> apply(List<String> jobBeans) { + return new ArrayList<>(); + } + }; + private PurgeXCommand.JPAFunction<List<String>, List<String>> allSelector = new PurgeXCommand.JPAFunction<List<String>, + List<String>>() { + @Override + public List<String> apply(List<String> jobBeans) { + return jobBeans; + } + }; + private PurgeXCommand.JPAFunction<String, List<String>> simpleTree = new PurgeXCommand.JPAFunction<String, List<String>>() { + @Override + public List<String> apply(String wId) { + switch (wId) { + case "A": + return Arrays.asList("B", "C"); + case "B:": + case "C": + return new ArrayList<>(); + } + return new ArrayList<>(); + } + }; + private PurgeXCommand.JPAFunction<String, List<String>> invalidTree = new PurgeXCommand.JPAFunction<String, List<String>>() { + @Override + public List<String> apply(String wId) { + switch (wId) { + case "A": + return Arrays.asList("B", "C", "D"); + case "B:": + case "C": + return new ArrayList<>(); + case "D": + return Collections.singletonList("A"); + } + return new ArrayList<>(); + } + }; + + @Test + public void testSingleWorkflow() throws JPAExecutorException { + PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new PurgeXCommand.SelectorTreeTraverser<>("A", + noChildren, noneSelector); + List<String> descendants = traverser.findAllDescendantNodesIfSelectable(); + assertEquals(Collections.singletonList("A"), descendants); + } + + @Test + public void testOneDepthTreeNotSelectedChildren() throws JPAExecutorException { + PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new PurgeXCommand.SelectorTreeTraverser<>("A", + simpleTree, noneSelector); + List<String> descendants = traverser.findAllDescendantNodesIfSelectable(); + assertEquals(Collections.<String>emptyList(), descendants); + } + + @Test + public void testOneDepthTreeSelectedChildren() throws JPAExecutorException { + PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new PurgeXCommand.SelectorTreeTraverser<>("A", + simpleTree, allSelector); + List<String> descendants = traverser.findAllDescendantNodesIfSelectable(); + assertEquals(Arrays.asList("A", "B", "C"), descendants); + } + + @Test + public void testInvalidTree() throws JPAExecutorException { + PurgeXCommand.SelectorTreeTraverser<String, String> traverser = new PurgeXCommand.SelectorTreeTraverser<>("A", + invalidTree, allSelector); + expectedException.expect(JPAExecutorException.class); + traverser.findAllDescendantNodesIfSelectable(); + } + +} \ No newline at end of file diff --git a/release-log.txt b/release-log.txt index 2171c4e..6211117 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.2.0 release (trunk - unreleased) +OOZIE-3400 [core] Fix PurgeService sub-sub-workflow checking (asalamon74 via andras.piros) OOZIE-3410 [build] Deploy fluent-job and oozie-sharelib-git artifacts (andras.piros via gezapeti) OOZIE-3397 amend Improve logging in NotificationXCommand (asalamon74 via andras.piros) OOZIE-3398 [docs] Fix oozie sub-workflow documentation (asalamon74 via andras.piros)