Repository: oozie Updated Branches: refs/heads/master 1502633e7 -> 39e2ed5d9
OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/39e2ed5d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/39e2ed5d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/39e2ed5d Branch: refs/heads/master Commit: 39e2ed5d93e03da03de3c9f4f4e23f4003c741b6 Parents: 1502633 Author: Rohini Palaniswamy <[email protected]> Authored: Mon Sep 14 14:43:33 2015 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Mon Sep 14 14:43:33 2015 -0700 ---------------------------------------------------------------------- .../oozie/command/coord/CoordRerunXCommand.java | 78 +++++++++++--------- .../apache/oozie/dependency/HCatURIHandler.java | 14 +++- release-log.txt | 1 + 3 files changed, 54 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java index 19b9219..72e0f75 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java @@ -26,6 +26,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; @@ -141,10 +142,11 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio * @param eAction coordinator action xml */ @SuppressWarnings("unchecked") - private void cleanupOutputEvents(Element eAction) + private void cleanupOutputEvents(Element eAction, Configuration coordJobConf, Map<String, Context> uriHandlerContextMap) throws CommandException { Element outputList = eAction.getChild("output-events", eAction.getNamespace()); if (outputList != null) { + for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { String nocleanup = data.getAttributeValue("nocleanup"); if (data.getChild("uris", data.getNamespace()) != null @@ -152,25 +154,16 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); if (uris != null) { String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); - Configuration actionConf = null; - try { - actionConf = new XConfiguration(new StringReader(coordJob.getConf())); - } - catch (IOException e) { - throw new CommandException(ErrorCode.E0907, - "failed to read coord job conf to clean up output data"); - } - HashMap<String, Context> contextMap = new HashMap<String, Context>(); try { for (String uriStr : uriArr) { URI uri = new URI(uriStr); URIHandler handler = Services.get().get(URIHandlerService.class).getURIHandler(uri); String schemeWithAuthority = uri.getScheme() + "://" + uri.getAuthority(); - if (!contextMap.containsKey(schemeWithAuthority)) { - Context context = handler.getContext(uri, actionConf, coordJob.getUser(), false); - contextMap.put(schemeWithAuthority, context); + if (!uriHandlerContextMap.containsKey(schemeWithAuthority)) { + Context context = handler.getContext(uri, coordJobConf, coordJob.getUser(), false); + uriHandlerContextMap.put(schemeWithAuthority, context); } - handler.delete(uri, contextMap.get(schemeWithAuthority)); + handler.delete(uri, uriHandlerContextMap.get(schemeWithAuthority)); LOG.info("Cleanup the output data " + uri.toString()); } } @@ -180,18 +173,10 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio catch (URIHandlerException e) { throw new CommandException(ErrorCode.E0907, e.getMessage()); } - finally { - Iterator<Entry<String, Context>> itr = contextMap.entrySet().iterator(); - while (itr.hasNext()) { - Entry<String, Context> entry = itr.next(); - entry.getValue().destroy(); - itr.remove(); - } - } } - } } + } else { LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); @@ -363,21 +348,42 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); List<CoordinatorActionBean> coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); if (checkAllActionsRunnable(coordActions)) { - for (CoordinatorActionBean coordAction : coordActions) { - String actionXml = coordAction.getActionXml(); - if (!noCleanup) { - Element eAction = XmlUtils.parseXml(actionXml); - cleanupOutputEvents(eAction); - } - if (refresh) { - refreshAction(coordJob, coordAction); + Map<String, Context> uriHandlerContextMap = new HashMap<String, Context>(); + Configuration coordJobConf = null; + try { + coordJobConf = new XConfiguration(new StringReader(coordJob.getConf())); + } + catch (IOException e) { + throw new CommandException(ErrorCode.E0907, "failed to read coord job conf to clean up output data"); + } + try { + for (CoordinatorActionBean coordAction : coordActions) { + String actionXml = coordAction.getActionXml(); + if (!noCleanup) { + Element eAction = XmlUtils.parseXml(actionXml); + cleanupOutputEvents(eAction, coordJobConf, uriHandlerContextMap); + } + if (refresh) { + refreshAction(coordJob, coordAction); + } + updateAction(coordJob, coordAction); + if (SLAService.isEnabled()) { + SLAOperations.updateRegistrationEvent(coordAction.getId()); + } + queue(new CoordActionNotificationXCommand(coordAction), 100); + queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); + if (coordAction.getPushMissingDependencies() != null) { + queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); + } } - updateAction(coordJob, coordAction); - if (SLAService.isEnabled()) { - SLAOperations.updateRegistrationEvent(coordAction.getId()); + } + finally { + Iterator<Entry<String, Context>> itr = uriHandlerContextMap.entrySet().iterator(); + while (itr.hasNext()) { + Entry<String, Context> entry = itr.next(); + entry.getValue().destroy(); + itr.remove(); } - queue(new CoordActionNotificationXCommand(coordAction), 100); - queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); } } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java index 566d40d..1bbf37d 100644 --- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java @@ -31,7 +31,6 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.api.ConnectionFailureException; import org.apache.hive.hcatalog.api.HCatClient; @@ -279,8 +278,9 @@ public class HCatURIHandler implements URIHandler { ugi.addToken(token); } finally { - if (tokenClient != null) + if (tokenClient != null) { tokenClient.close(); + } } } XLog.getLog(HCatURIHandler.class).info( @@ -415,10 +415,18 @@ public class HCatURIHandler implements URIHandler { @Override public void destroy() { try { - hcatClient.close(); + if (delegationToken != null && !delegationToken.isEmpty()) { + hcatClient.cancelDelegationToken(delegationToken); + } delegationToken = null; } catch (Exception ignore) { + XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore); + } + try { + hcatClient.close(); + } + catch (Exception ignore) { XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index bf84976..d533d78 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini) OOZIE-2356 Add a way to enable/disable credentials in a workflow (rkanter) OOZIE-2355 Hive2 Action doesn't pass along oozie configs to jobconf (rkanter) OOZIE-2318 Provide better solution for specifying SSL truststore to Oozie Client (rkanter)
