Repository: oozie Updated Branches: refs/heads/master ab3a17497 -> 198f5c2a5
OOZIE-1492 Make sure HA works with HCat (ryota) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/198f5c2a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/198f5c2a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/198f5c2a Branch: refs/heads/master Commit: 198f5c2a5e7fce71d6fa638d2d26fa006fee171a Parents: ab3a174 Author: egashira <[email protected]> Authored: Mon Jun 16 01:22:04 2014 -0700 Committer: egashira <[email protected]> Committed: Mon Jun 16 01:23:29 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorActionBean.java | 3 + .../coord/CoordPushDependencyCheckXCommand.java | 4 + .../hcat/EhcacheHCatDependencyCache.java | 54 ++++ .../dependency/hcat/HCatDependencyCache.java | 12 + .../hcat/SimpleHCatDependencyCache.java | 118 ++++++++- .../executor/jpa/CoordActionQueryExecutor.java | 21 +- .../oozie/service/JobsConcurrencyService.java | 8 + .../PartitionDependencyManagerService.java | 93 +++++++ .../oozie/service/ZKJobsConcurrencyService.java | 10 + .../TestCoordActionInputCheckXCommand.java | 5 +- .../TestCoordPushDependencyCheckXCommand.java | 1 - ...TestHAPartitionDependencyManagerEhCache.java | 42 +++ ...TestHAPartitionDependencyManagerService.java | 257 +++++++++++++++++++ .../TestPartitionDependencyManagerEhcache.java | 1 - .../TestPartitionDependencyManagerService.java | 36 +-- .../java/org/apache/oozie/test/XTestCase.java | 10 +- .../java/org/apache/oozie/test/ZKXTestCase.java | 2 +- release-log.txt | 1 + 18 files changed, 653 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index 8cbcc4f..51eaf2d 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -136,6 +136,9 @@ import org.json.simple.JSONObject; // Query to retrieve status of Coordinator actions @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"), + // Query to retrieve status of Coordinator actions + @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"), + @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"), http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java index 2e5cd47..ae71924 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Date; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; @@ -43,6 +44,7 @@ import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.CallableQueueService; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.PartitionDependencyManagerService; import org.apache.oozie.service.RecoveryService; import org.apache.oozie.service.Service; import org.apache.oozie.service.Services; @@ -207,6 +209,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> protected void onAllPushDependenciesAvailable() throws CommandException { coordAction.setPushMissingDependencies(""); + Services.get().get(PartitionDependencyManagerService.class) + .removeCoordActionWithDependenciesAvailable(coordAction.getId()); if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) { Date nominalTime = coordAction.getNominalTime(); Date currentTime = new Date(); http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java index 6f127c4..5743c15 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java @@ -17,13 +17,16 @@ */ package org.apache.oozie.dependency.hcat; +import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -472,4 +475,55 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve } } + @Override + public void removeNonWaitingCoordActions(Set<String> staleActions) { + Iterator<String> serverItr = missingDepsByServer.keySet().iterator(); + while (serverItr.hasNext()) { + String server = serverItr.next(); + Cache missingCache = missingDepsByServer.get(server); + if (missingCache == null) { + continue; + } + synchronized (missingCache) { + for (Object key : missingCache.getKeys()) { + Element element = missingCache.get(key); + if (element == null) { + continue; + } + Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()) + .getWaitingActions(); + Iterator<WaitingAction> wactionItr = waitingActions.iterator(); + HCatURI hcatURI = null; + while(wactionItr.hasNext()) { + WaitingAction waction = wactionItr.next(); + if(staleActions.contains(waction.getActionID())) { + try { + hcatURI = new HCatURI(waction.getDependencyURI()); + wactionItr.remove(); + } + catch (URISyntaxException e) { + continue; + } + } + } + if (waitingActions.isEmpty() && hcatURI != null) { + missingCache.remove(key); + // Decrement partition key pattern count if the cache entry is removed + SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap()); + String partKeys = sortedPKV.getPartKeys(); + String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER + + hcatURI.getTable(); + String hcatURIStr = hcatURI.toURIString(); + decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr); + } + } + } + } + } + + @Override + public void removeCoordActionWithDependenciesAvailable(String coordAction) { + // to be implemented when reverse-lookup data structure for purging is added + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java index df3afd3..e1e770f 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java @@ -19,6 +19,7 @@ package org.apache.oozie.dependency.hcat; import java.util.Collection; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.util.HCatURI; @@ -89,4 +90,15 @@ public interface HCatDependencyCache { * Destroy the cache */ public void destroy(); + + /** + * Purge stale actions + */ + public void removeNonWaitingCoordActions(Set<String> coordActions); + + /** + * Remove coordAction when all dependencies met + */ + public void removeCoordActionWithDependenciesAvailable(String coordAction); + } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java index e8e3ebc..08aa8f9 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java @@ -17,14 +17,17 @@ */ package org.apache.oozie.dependency.hcat; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -51,13 +54,16 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { */ private ConcurrentMap<String, Collection<String>> availableDeps; - // TODO: - // Gather and print stats on cache hits and misses. + /** + * Map of actionIDs and partitions for reverse-lookup in purging + */ + private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap; @Override public void init(Configuration conf) { missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>(); availableDeps = new ConcurrentHashMap<String, Collection<String>>(); + actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>(); } @Override @@ -77,6 +83,23 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { partKeyPatterns = existingMap; } } + ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); + if (partitionMap == null) { + partitionMap = new ConcurrentHashMap<String, Collection<String>>(); + ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID, + partitionMap); + if (existingPartMap != null) { + partitionMap = existingPartMap; + } + } + synchronized (partitionMap) { + Collection<String> partKeys = partitionMap.get(tableKey); + if (partKeys == null) { + partKeys = new ArrayList<String>(); + } + partKeys.add(partKey); + partitionMap.put(tableKey, partKeys); + } synchronized (partKeyPatterns) { missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); @@ -105,6 +128,22 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { hcatURI.toURIString(), actionID); return false; } + ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID); + if (partitionMap != null) { + synchronized (partitionMap) { + Collection<String> partKeys = partitionMap.get(tableKey); + if (partKeys != null) { + partKeys.remove(partKey); + } + if (partKeys.size() == 0) { + partitionMap.remove(tableKey); + } + if (partitionMap.size() == 0) { + actionPartitionMap.remove(actionID); + } + } + } + synchronized(partKeyPatterns) { Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); if (partValues == null) { @@ -292,7 +331,82 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache { public String getPartVals() { return partVals.toString(); } + } + + private HCatURI removePartitions(String coordActionId, Collection<String> partKeys, + Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) { + HCatURI hcatUri = null; + for (String partKey : partKeys) { + Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey); + Iterator<String> partValItr = partValues.keySet().iterator(); + while (partValItr.hasNext()) { + String partVal = partValItr.next(); + Collection<WaitingAction> waitingActions = partValues.get(partVal); + if (waitingActions != null) { + Iterator<WaitingAction> waitItr = waitingActions.iterator(); + while (waitItr.hasNext()) { + WaitingAction waction = waitItr.next(); + if (coordActionId.contains(waction.getActionID())) { + waitItr.remove(); + if (hcatUri == null) { + try { + hcatUri = new HCatURI(waction.getDependencyURI()); + } + catch (URISyntaxException e) { + continue; + } + } + } + } + } + // delete partition value with no waiting actions + if (waitingActions.size() == 0) { + partValItr.remove(); + } + } + if (partValues.size() == 0) { + partKeyPatterns.remove(partKey); + } + } + return hcatUri; + } + @Override + public void removeNonWaitingCoordActions(Set<String> coordActions) { + HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); + for (String coordActionId : coordActions) { + synchronized (actionPartitionMap) { + Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId); + if (partitionMap != null) { + Iterator<String> tableItr = partitionMap.keySet().iterator(); + while (tableItr.hasNext()) { + String tableKey = tableItr.next(); + HCatURI hcatUri = null; + Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey); + if (partKeyPatterns != null) { + synchronized (partKeyPatterns) { + Collection<String> partKeys = partitionMap.get(tableKey); + if (partKeys != null) { + hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns); + } + } + if (partKeyPatterns.size() == 0) { + tableItr.remove(); + if (hcatUri != null) { + // Close JMS session. Stop listening on topic + hcatService.unregisterFromNotification(hcatUri); + } + } + } + } + } + actionPartitionMap.remove(coordActionId); + } + } } + @Override + public void removeCoordActionWithDependenciesAvailable(String coordAction) { + actionPartitionMap.remove(coordAction); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index f5304ca..d56af7b 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -25,10 +25,16 @@ import java.util.List; import javax.persistence.EntityManager; import javax.persistence.Query; +import org.apache.oozie.BinaryBlob; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; +import org.apache.oozie.StringBlob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.util.DateUtils; import com.google.common.annotations.VisibleForTesting; @@ -48,6 +54,7 @@ public class CoordActionQueryExecutor extends UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, UPDATE_COORD_ACTION_RERUN, GET_COORD_ACTION, + GET_COORD_ACTION_STATUS, GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME }; @@ -174,6 +181,7 @@ public class CoordActionQueryExecutor extends CoordActionQuery caQuery = (CoordActionQuery) namedQuery; switch (caQuery) { case GET_COORD_ACTION: + case GET_COORD_ACTION_STATUS: query.setParameter("id", parameters[0]); break; case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: @@ -198,10 +206,11 @@ public class CoordActionQueryExecutor extends public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { EntityManager em = jpaService.getEntityManager(); Query query = getSelectQuery(namedQuery, em, parameters); - CoordinatorActionBean bean = (CoordinatorActionBean) jpaService.executeGet(namedQuery.name(), query, em); - if (bean == null) { + Object ret = jpaService.executeGet(namedQuery.name(), query, em); + if (ret == null) { throw new JPAExecutorException(ErrorCode.E0605, query.toString()); } + CoordinatorActionBean bean = constructBean(namedQuery, ret); return bean; } @@ -222,11 +231,19 @@ public class CoordActionQueryExecutor extends private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { CoordinatorActionBean bean; + Object[] arr; switch (namedQuery) { + case GET_COORD_ACTION: + bean = (CoordinatorActionBean) ret; + break; case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: bean = new CoordinatorActionBean(); bean.setJobId((String) ret); break; + case GET_COORD_ACTION_STATUS: + bean = new CoordinatorActionBean(); + bean.setStatusStr((String)ret); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java index 27c97e6..36adbd6 100644 --- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java @@ -134,4 +134,12 @@ public class JobsConcurrencyService implements Service, Instrumentable { public boolean isAllServerRequest(Map<String, String[]> params) { return false; } + + /** + * Check if it is running in HA mode + * @return false + */ + public boolean isHighlyAvailableMode(){ + return false; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java index 985dcab..41d1ba2 100644 --- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java +++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java @@ -18,16 +18,28 @@ package org.apache.oozie.service; import java.util.Collection; +import java.util.Date; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; import org.apache.oozie.dependency.hcat.HCatDependencyCache; import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.util.HCatURI; import org.apache.oozie.util.XLog; +import com.google.common.annotations.VisibleForTesting; + /** * Module that functions like a caching service to maintain partition dependency mappings */ @@ -35,11 +47,20 @@ public class PartitionDependencyManagerService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; + public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval"; + public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl"; private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); private HCatDependencyCache dependencyCache; + /** + * Keep timestamp when missing dependencies of a coord action are registered + */ + private ConcurrentMap<String, Long> registeredCoordActionMap; + + private boolean purgeEnabled = false; + @Override public void init(Services services) throws ServiceException { init(services.getConf()); @@ -52,6 +73,57 @@ public class PartitionDependencyManagerService implements Service { dependencyCache.init(conf); LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() .getName()); + purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode(); + if (purgeEnabled) { + Runnable purgeThread = new CachePurgeWorker(dependencyCache); + // schedule runnable by default every 10 min + Services.get() + .get(SchedulerService.class) + .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), + SchedulerService.Unit.SEC); + registeredCoordActionMap = new ConcurrentHashMap<String, Long>(); + } + } + + private class CachePurgeWorker implements Runnable { + HCatDependencyCache cache; + public CachePurgeWorker(HCatDependencyCache cache) { + this.cache = cache; + } + + @Override + public void run() { + if (Thread.currentThread().isInterrupted()) { + return; + } + try { + purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800)); + } + catch (Throwable error) { + XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error); + } + } + + private void purgeMissingDependency(int timeToLive) { + long currentTime = new Date().getTime(); + Set<String> staleActions = new HashSet<String>(); + for(String actionId : registeredCoordActionMap.keySet()) { + Long regTime = registeredCoordActionMap.get(actionId); + if(regTime < (currentTime - timeToLive * 1000)){ + CoordinatorActionBean caBean = null; + try { + caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); + } + catch (JPAExecutorException e) { + LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); + } + if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ + staleActions.add(actionId); + } + } + } + dependencyCache.removeNonWaitingCoordActions(staleActions); + } } @Override @@ -71,6 +143,9 @@ public class PartitionDependencyManagerService implements Service { * @param actionID ID of action which is waiting for the dependency */ public void addMissingDependency(HCatURI hcatURI, String actionID) { + if (purgeEnabled) { + registeredCoordActionMap.put(actionID, new Date().getTime()); + } dependencyCache.addMissingDependency(hcatURI, actionID); } @@ -142,4 +217,22 @@ public class PartitionDependencyManagerService implements Service { return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); } + /** + * Remove a coord action from dependency cache when all push missing dependencies available + * + * @param actionID action id + * @param dependencyURIs set of dependency URIs + * @return true if successful, else false + */ + public void removeCoordActionWithDependenciesAvailable(String actionID) { + if (purgeEnabled) { + registeredCoordActionMap.remove(actionID); + } + dependencyCache.removeCoordActionWithDependenciesAvailable(actionID); + } + + @VisibleForTesting + public void runCachePurgeWorker() { + new CachePurgeWorker(dependencyCache).run(); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java index 611b74c..1d5f4a4 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java @@ -207,4 +207,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty() || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false"); } + + /** + * Return if it is running in HA mode + * + * @return + */ + @Override + public boolean isHighlyAvailableMode() { + return true; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java index 1ffadbd..0eafacf 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java @@ -268,8 +268,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase { } public void testActionInputCheckLatestActionCreationTimeWithPushDependency() throws Exception { + setupServicesForHCatalog(services); Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false); - + services.init(); String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C"; Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ); Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ); @@ -400,7 +401,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase { } public void testActionInputCheckLatestCurrentTimeWithPushDependency() throws Exception { + setupServicesForHCatalog(services); Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true); + services.init(); String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C"; Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ); http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java index da09727..3c8b082 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java @@ -464,5 +464,4 @@ public class TestCoordPushDependencyCheckXCommand extends XDataTestCase { throw new Exception("Action ID " + actionId + " was not stored properly in db"); } } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java new file mode 100644 index 0000000..9d3165d --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.service; + +import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache; + +public class TestHAPartitionDependencyManagerEhCache extends TestHAPartitionDependencyManagerService { + + protected void setUp() throws Exception { + super.setUp(); + services.getConf().set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL, + EhcacheHCatDependencyCache.class.getName()); + services.setService(ZKJobsConcurrencyService.class); + PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class); + pdms.init(services); + } + + @Override + public void testDependencyCacheWithHA(){ + } + + @Override + public void testPurgeMissingDependencies() throws Exception { + PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class); + testPurgeMissingDependenciesForCache(pdms); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java new file mode 100644 index 0000000..da383b3 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.util.Shell; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.client.CoordinatorAction.Status; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.dependency.hcat.HCatMessageHandler; +import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.service.RecoveryService.RecoveryRunnable; +import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.HCatURI; + +public class TestHAPartitionDependencyManagerService extends ZKXTestCase { + + protected Services services; + protected String server; + protected String db; + protected String table1; + protected String table2; + protected String part1; + protected String part2; + protected String part3; + + protected void setUp() throws Exception { + super.setUp(); + services = super.setupServicesForHCatalog(Services.get()); + // disable recovery service + services.getConf().setInt(RecoveryService.CONF_SERVICE_INTERVAL, 1000000); + // disable regular cache purge + services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000); + server = super.getHCatalogServer().getMetastoreAuthority(); + services.init(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + private void populateTable() throws Exception { + dropTable(db, table1, true); + dropTable(db, table2, true); + dropDatabase(db, true); + createDatabase(db); + createTable(db, table1, "dt,country"); + createTable(db, table2, "dt,country"); + } + + protected String getSanitizedTestCaseDir() { + // On Windows, the working directory will have a colon from to the drive letter. Because colons + // are not allowed in DFS paths, we remove it. Also, prepend a backslash to simulate an absolute path. + if(Shell.WINDOWS) { + return "\\" + getTestCaseDir().replaceAll(":", ""); + } + else { + return getTestCaseDir(); + } + } + + public void testDependencyCacheWithHA() throws Exception { + + db = "default"; + table1 = "mytbl"; + table2 = "mytb2"; + part1 = "dt=20120101;country=us"; + part2 = "dt=20120102;country=us"; + part3 = "dt=20120103;country=us"; + String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1; + String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2; + String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3; + HCatURI dep1 = new HCatURI(newHCatDependency1); + HCatURI dep2 = new HCatURI(newHCatDependency2); + HCatURI dep3 = new HCatURI(newHCatDependency3); + // create db, table and partitions + populateTable(); + + String actionId1 = addInitRecords(newHCatDependency1); + String actionId2 = addInitRecords(newHCatDependency2); + String actionId3 = addInitRecords(newHCatDependency3); + + // Assume dependency cache on dummy server with missing push dependencies registered + PartitionDependencyManagerService dummyPdms = new PartitionDependencyManagerService(); + PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + dummyPdms.init(Services.get()); + dummyPdms.addMissingDependency(dep1, actionId1); + dummyPdms.addMissingDependency(dep2, actionId2); + dummyPdms.addMissingDependency(dep3, actionId3); + + Collection<String> waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep1); + assertEquals(1, waitingActions.size()); + waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep2); + assertEquals(1, waitingActions.size()); + waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep3); + assertEquals(1, waitingActions.size()); + + //Dependency cache on living server doesn't have these partitions registered at this point + waitingActions = (Collection<String>)pdms.getWaitingActions(dep1); + assertNull(waitingActions); + waitingActions = (Collection<String>)pdms.getWaitingActions(dep2); + assertNull(waitingActions); + waitingActions = (Collection<String>)pdms.getWaitingActions(dep3); + assertNull(waitingActions); + + //Assume dummy server is down, and recovery service on living server pick up these jobs + dummyPdms.destroy(); + Runnable recoveryRunnable = new RecoveryRunnable(60, 0, 60); + recoveryRunnable.run(); + waitFor(30 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + Collection<String> waitingActions; + PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + HCatURI dep1 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part1); + HCatURI dep2 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part2); + HCatURI dep3 = new HCatURI("hcat://"+ server + "/" + db + "/" + table2 + "/" + part3); + waitingActions = pdms.getWaitingActions(dep1); + if(waitingActions == null) { + return false; + } + waitingActions = pdms.getWaitingActions(dep2); + if(waitingActions == null) { + return false; + } + waitingActions = pdms.getWaitingActions(dep3); + if(waitingActions == null) { + return false; + } + return true; + } + }); + //Dependency cache on living server has missing partitions added + waitingActions = (Collection<String>)pdms.getWaitingActions(dep1); + assertEquals(1, waitingActions.size()); + assertTrue(waitingActions.contains(actionId1)); + waitingActions = (Collection<String>)pdms.getWaitingActions(dep2); + assertEquals(1, waitingActions.size()); + assertTrue(waitingActions.contains(actionId2)); + waitingActions = (Collection<String>)pdms.getWaitingActions(dep3); + assertEquals(1, waitingActions.size()); + assertTrue(waitingActions.contains(actionId3)); + + HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); + // mytbl and mytb2 registered to topic map to receive notification + assertTrue(hcatService.isRegisteredForNotification(dep1)); + assertTrue(hcatService.isRegisteredForNotification(dep2)); + assertTrue(hcatService.isRegisteredForNotification(dep3)); + } + + protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) { + pdms.addMissingDependency(hcatURI, actionId); + HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); + if (!hcatService.isRegisteredForNotification(hcatURI)) { + hcatService.registerForNotification(hcatURI, hcatURI.getDb() + "." + hcatURI.getTable(), + new HCatMessageHandler(hcatURI.getServer())); + } + } + + public void testPurgeMissingDependencies() throws Exception{ + services.setService(ZKJobsConcurrencyService.class); + PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class); + pdms.init(services); + testPurgeMissingDependenciesForCache(pdms); + } + + protected void testPurgeMissingDependenciesForCache(PartitionDependencyManagerService pdms) throws Exception{ + + String actionId1 = "1234465451"; + String actionId2 = "1234465452"; + String actionId3 = "1234465453"; + + // add partitions as missing + HCatURI dep1 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/dt=20120101;country=us"); + HCatURI dep2 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/country=us;dt=20120101"); + HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us"); + + // actionId1-->(dep1,2), actionId2-->(dep2), actionId3-->(dep2,3) + addMissingDependencyAndRegister(dep1, actionId1, pdms); + addMissingDependencyAndRegister(dep2, actionId1, pdms); + addMissingDependencyAndRegister(dep2, actionId2, pdms); + addMissingDependencyAndRegister(dep2, actionId3, pdms); + addMissingDependencyAndRegister(dep3, actionId3, pdms); + + List<String> waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1); + assertEquals(waitingDep1.size(), 1); + assertEquals(waitingDep1.get(0), actionId1); + + List<String> waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2); + assertEquals(waitingDep2.size(), 3); + for (String id : waitingDep2) { + assertTrue(id.equals(actionId1) || id.equals(actionId2) || id.equals(actionId3)); + } + List<String> waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3); + assertEquals(waitingDep3.size(), 1); + assertTrue(waitingDep3.get(0).equals(actionId3)); + + // make only coordAction 1 to WAITING, the rest to RUNNING (only WAITING + // remain dependency cache) + ArrayList<JsonBean> insertList = new ArrayList<JsonBean>(); + CoordinatorActionBean coordAction1 = new CoordinatorActionBean(); + coordAction1.setId(actionId1); + coordAction1.setStatus(Status.WAITING); + insertList.add(coordAction1); + CoordinatorActionBean coordAction2 = new CoordinatorActionBean(); + coordAction2.setId(actionId2); + coordAction2.setStatus(Status.RUNNING); + insertList.add(coordAction2); + CoordinatorActionBean coordAction3 = new CoordinatorActionBean(); + coordAction3.setId(actionId3); + coordAction3.setStatus(Status.RUNNING); + insertList.add(coordAction3); + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + + // run cache purge + Services.get().getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0); + pdms.runCachePurgeWorker(); + + // only coord Action 1 still in dependency cache + waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1); + assertEquals(waitingDep1.size(), 1); + assertTrue(waitingDep1.get(0).equals(actionId1)); + + // only coord Action 1 still in dependency cache + waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2); + assertEquals(waitingDep2.size(), 1); + assertTrue(waitingDep2.get(0).equals(actionId1)); + + waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3); + assertNull(waitingDep3); + + HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); + // mytbl1 should be still in topic map + assertTrue(hcatService.isRegisteredForNotification(dep1)); + // mytbl1 should be still in topic map + assertTrue(hcatService.isRegisteredForNotification(dep2)); + // mytbl2 should NOT be in topic map + assertFalse(hcatService.isRegisteredForNotification(dep3)); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java index cfdfbd1..7b88a19 100644 --- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java +++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java @@ -131,5 +131,4 @@ public class TestPartitionDependencyManagerEhcache extends TestPartitionDependen assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID)); } } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java index ef71fb0..67ea851 100644 --- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java +++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java @@ -20,16 +20,20 @@ package org.apache.oozie.service; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.client.CoordinatorAction.Status; +import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.dependency.hcat.HCatMessageHandler; +import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.jms.JMSConnectionInfo; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.HCatURI; import org.apache.oozie.util.XLog; -import org.junit.After; -import org.junit.Before; import org.junit.Test; /** @@ -40,14 +44,15 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { private static XLog LOG = XLog.getLog(TestPartitionDependencyManagerService.class); protected Services services; - @Before + protected void setUp() throws Exception { super.setUp(); services = super.setupServicesForHCatalog(); + // disable regular cache purge + services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000); services.init(); } - @After protected void tearDown() throws Exception { Services.get().destroy(); super.tearDown(); @@ -55,6 +60,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { @Test public void testPartitionDependency() throws Exception { + // Test all APIs related to dependency caching String actionId1 = "1234465451"; String actionId2 = "1234465452"; @@ -72,15 +78,16 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us"); HCatURI dep4 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us;state=CA"); - addMissingDependencyAndRegister(dep1, actionId1); - addMissingDependencyAndRegister(dep2, actionId1); - addMissingDependencyAndRegister(dep2, actionId2); - addMissingDependencyAndRegister(dep2, actionId3); - addMissingDependencyAndRegister(dep3, actionId3); - addMissingDependencyAndRegister(dep4, actionId4); + PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + addMissingDependencyAndRegister(dep1, actionId1, pdms); + addMissingDependencyAndRegister(dep2, actionId1, pdms); + addMissingDependencyAndRegister(dep2, actionId2, pdms); + addMissingDependencyAndRegister(dep2, actionId3, pdms); + addMissingDependencyAndRegister(dep3, actionId3, pdms); + addMissingDependencyAndRegister(dep4, actionId4, pdms); // Add duplicates. RecoveryService will add duplicates - addMissingDependencyAndRegister(dep4, actionId4); - addMissingDependencyAndRegister(dep4, actionId4); + addMissingDependencyAndRegister(dep4, actionId4, pdms); + addMissingDependencyAndRegister(dep4, actionId4, pdms); HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); @@ -90,7 +97,6 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { assertTrue(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable())); assertTrue(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable())); - PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); assertTrue(pdms.getWaitingActions(dep1).contains(actionId1)); assertTrue(pdms.getWaitingActions(dep2).contains(actionId1)); assertTrue(pdms.getWaitingActions(dep2).contains(actionId2)); @@ -130,8 +136,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable())); } - private void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId) { - PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class); + protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) { pdms.addMissingDependency(hcatURI, actionId); HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); if (!hcatService.isRegisteredForNotification(hcatURI)) { @@ -191,5 +196,4 @@ public class TestPartitionDependencyManagerService extends XDataTestCase { assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID)); } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 1536927..6bf0a8f 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -1068,6 +1068,11 @@ public abstract class XTestCase extends TestCase { protected Services setupServicesForHCatalog() throws ServiceException { Services services = new Services(); + setupServicesForHCataLogImpl(services); + return services; + } + + private void setupServicesForHCataLogImpl(Services services) { Configuration conf = services.getConf(); conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + "," + @@ -1081,8 +1086,11 @@ public abstract class XTestCase extends TestCase { FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName()); setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); - return services; } + protected Services setupServicesForHCatalog(Services services) throws ServiceException { + setupServicesForHCataLogImpl(services); + return services; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java index 7bebaf0..3d37d48 100644 --- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java @@ -49,7 +49,7 @@ import org.apache.oozie.util.ZKUtils; * <p> * To use security, see {@link ZKXTestCaseWithSecurity}. */ -public abstract class ZKXTestCase extends XTestCase { +public abstract class ZKXTestCase extends XDataTestCase { private TestingServer zkServer; private CuratorFramework client = null; private ServiceDiscovery<Map> sDiscovery = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index e311c7f..2f89c3e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1492 Make sure HA works with HCat (ryota) OOZIE-1869 Sharelib update shows vip/load balancer address as one of the hostname (puru via ryota) OOZIE-1861 Pig action should work with tez mode (rohini) OOZIE-1703 User should be able to set coord end-time before start time (puru via rohini)
