Repository: oozie Updated Branches: refs/heads/master 6ea1ed833 -> 8d381099b
OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d381099 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d381099 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d381099 Branch: refs/heads/master Commit: 8d381099b5541986535c70050d48269869ff007a Parents: 6ea1ed8 Author: Purshotam Shah <[email protected]> Authored: Wed May 6 13:55:16 2015 -0700 Committer: Purshotam Shah <[email protected]> Committed: Wed May 6 13:55:16 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/BundleEngine.java | 2 +- .../org/apache/oozie/CoordinatorEngine.java | 2 +- .../main/java/org/apache/oozie/DagEngine.java | 2 +- .../PartitionDependencyManagerService.java | 13 ++++- ...TestHAPartitionDependencyManagerService.java | 57 +++++++++++++++++++- release-log.txt | 1 + 6 files changed, 72 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index bfb29e6..4f0f3bf 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -292,7 +292,7 @@ public class BundleEngine extends BaseEngine { if (lastTime == null) { lastTime = new Date(); } - fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType); + fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); } catch (Exception ex) { throw new IOException(ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index cb67be0..482d6a9 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -347,7 +347,7 @@ public class CoordinatorEngine extends BaseEngine { if (lastTime == null) { lastTime = new Date(); } - fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType); + fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); } catch (Exception e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index 08a126c..3d09206 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -458,7 +458,7 @@ public class DagEngine extends BaseEngine { if (lastTime == null) { lastTime = job.getLastModifiedTime(); } - fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType); + fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); } catch (Exception e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java index c8f2b20..a350772 100644 --- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java +++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java @@ -18,6 +18,7 @@ package org.apache.oozie.service; +import java.text.MessageFormat; import java.util.Collection; import java.util.Date; import java.util.HashSet; @@ -27,9 +28,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; import org.apache.oozie.dependency.hcat.HCatDependencyCache; @@ -119,7 +122,15 @@ public class PartitionDependencyManagerService implements Service { caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); } catch (JPAExecutorException e) { - LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); + if (e.getErrorCode() == ErrorCode.E0605) { + LOG.info(MessageFormat.format( + "Coord action {0} is not in database, deleting it from cache", actionId)); + staleActions.add(actionId); + actionItr.remove(); + } + else { + LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); + } } if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ staleActions.add(actionId); http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java index 031e3e4..d681d42 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java @@ -28,9 +28,10 @@ import org.apache.oozie.client.CoordinatorAction.Status; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.dependency.hcat.HCatMessageHandler; import org.apache.oozie.executor.jpa.BatchQueryExecutor; -import org.apache.oozie.service.RecoveryService.RecoveryRunnable; +import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor; import org.apache.oozie.test.ZKXTestCase; import org.apache.oozie.util.HCatURI; +import org.apache.oozie.service.RecoveryService.RecoveryRunnable; public class TestHAPartitionDependencyManagerService extends ZKXTestCase { @@ -255,4 +256,58 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase { // mytbl2 should NOT be in topic map assertFalse(hcatService.isRegisteredForNotification(dep3)); } + + public void testCheckAfterActionDelete() throws Exception { + Services.get().setService(ZKJobsConcurrencyService.class); + Services.get().get(ConfigurationService.class).getConf() + .setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0); + + db = "default"; + table1 = "mytbl"; + table2 = "mytb2"; + part1 = "dt=20120101;country=us"; + part2 = "dt=20120102;country=us"; + part3 = "dt=20120103;country=us"; + String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1; + String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2; + String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3; + HCatURI dep1 = new HCatURI(newHCatDependency1); + HCatURI dep2 = new HCatURI(newHCatDependency2); + HCatURI dep3 = new HCatURI(newHCatDependency3); + // create db, table and partitions + populateTable(); + + String actionId1 = addInitRecords(newHCatDependency1); + String actionId2 = addInitRecords(newHCatDependency2); + String actionId3 = addInitRecords(newHCatDependency3); + + PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + pdms.init(Services.get()); + pdms.addMissingDependency(dep1, actionId1); + pdms.addMissingDependency(dep2, actionId2); + pdms.addMissingDependency(dep3, actionId3); + pdms.runCachePurgeWorker(); + + assertNotNull((Collection<String>) pdms.getWaitingActions(dep1)); + assertNotNull((Collection<String>) pdms.getWaitingActions(dep2)); + assertNotNull((Collection<String>) pdms.getWaitingActions(dep3)); + + List<String> deleteList = new ArrayList<String>(); + deleteList.add(actionId1); + JPAService jpaService = Services.get().get(JPAService.class); + jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList)); + pdms.runCachePurgeWorker(); + + assertNull((Collection<String>) pdms.getWaitingActions(dep1)); + assertNotNull((Collection<String>) pdms.getWaitingActions(dep2)); + assertNotNull((Collection<String>) pdms.getWaitingActions(dep3)); + deleteList.clear(); + deleteList.add(actionId2); + jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList)); + pdms.runCachePurgeWorker(); + assertNull((Collection<String>) pdms.getWaitingActions(dep1)); + assertNull((Collection<String>) pdms.getWaitingActions(dep2)); + assertNotNull((Collection<String>) pdms.getWaitingActions(dep3)); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 7e85b2c..a2c0fe6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions (puru) OOZIE-2163 Remove CoordinatorStore (seoeun25 via bzhang) OOZIE-2221 Oozie audit log has null id for some of input request (puru) OOZIE-2223 Improve documentation with regard to Java action retries (ben.roling via bzhang)
