Repository: oozie
Updated Branches:
  refs/heads/master 1502633e7 -> 39e2ed5d9


OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/39e2ed5d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/39e2ed5d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/39e2ed5d

Branch: refs/heads/master
Commit: 39e2ed5d93e03da03de3c9f4f4e23f4003c741b6
Parents: 1502633
Author: Rohini Palaniswamy <[email protected]>
Authored: Mon Sep 14 14:43:33 2015 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Mon Sep 14 14:43:33 2015 -0700

----------------------------------------------------------------------
 .../oozie/command/coord/CoordRerunXCommand.java | 78 +++++++++++---------
 .../apache/oozie/dependency/HCatURIHandler.java | 14 +++-
 release-log.txt                                 |  1 +
 3 files changed, 54 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
index 19b9219..72e0f75 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
@@ -26,6 +26,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
@@ -141,10 +142,11 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
      * @param eAction coordinator action xml
      */
     @SuppressWarnings("unchecked")
-    private void cleanupOutputEvents(Element eAction)
+    private void cleanupOutputEvents(Element eAction, Configuration 
coordJobConf, Map<String, Context> uriHandlerContextMap)
             throws CommandException {
         Element outputList = eAction.getChild("output-events", 
eAction.getNamespace());
         if (outputList != null) {
+
             for (Element data : (List<Element>) 
outputList.getChildren("data-out", eAction.getNamespace())) {
                 String nocleanup = data.getAttributeValue("nocleanup");
                 if (data.getChild("uris", data.getNamespace()) != null
@@ -152,25 +154,16 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
                     String uris = data.getChild("uris", 
data.getNamespace()).getTextTrim();
                     if (uris != null) {
                         String[] uriArr = 
uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
-                        Configuration actionConf = null;
-                        try {
-                            actionConf = new XConfiguration(new 
StringReader(coordJob.getConf()));
-                        }
-                        catch (IOException e) {
-                            throw new CommandException(ErrorCode.E0907,
-                                    "failed to read coord job conf to clean up 
output data");
-                        }
-                        HashMap<String, Context> contextMap = new 
HashMap<String, Context>();
                         try {
                             for (String uriStr : uriArr) {
                                 URI uri = new URI(uriStr);
                                 URIHandler handler = 
Services.get().get(URIHandlerService.class).getURIHandler(uri);
                                 String schemeWithAuthority = uri.getScheme() + 
"://" + uri.getAuthority();
-                                if 
(!contextMap.containsKey(schemeWithAuthority)) {
-                                    Context context = handler.getContext(uri, 
actionConf, coordJob.getUser(), false);
-                                    contextMap.put(schemeWithAuthority, 
context);
+                                if 
(!uriHandlerContextMap.containsKey(schemeWithAuthority)) {
+                                    Context context = handler.getContext(uri, 
coordJobConf, coordJob.getUser(), false);
+                                    
uriHandlerContextMap.put(schemeWithAuthority, context);
                                 }
-                                handler.delete(uri, 
contextMap.get(schemeWithAuthority));
+                                handler.delete(uri, 
uriHandlerContextMap.get(schemeWithAuthority));
                                 LOG.info("Cleanup the output data " + 
uri.toString());
                             }
                         }
@@ -180,18 +173,10 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
                         catch (URIHandlerException e) {
                             throw new CommandException(ErrorCode.E0907, 
e.getMessage());
                         }
-                        finally {
-                            Iterator<Entry<String, Context>> itr = 
contextMap.entrySet().iterator();
-                            while (itr.hasNext()) {
-                                Entry<String, Context> entry = itr.next();
-                                entry.getValue().destroy();
-                                itr.remove();
-                            }
-                        }
                     }
-
                 }
             }
+
         }
         else {
             LOG.info("No output-events defined in coordinator xml. Therefore 
nothing to cleanup");
@@ -363,21 +348,42 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
             InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
             List<CoordinatorActionBean> coordActions = 
CoordUtils.getCoordActions(rerunType, jobId, scope, false);
             if (checkAllActionsRunnable(coordActions)) {
-                for (CoordinatorActionBean coordAction : coordActions) {
-                    String actionXml = coordAction.getActionXml();
-                    if (!noCleanup) {
-                        Element eAction = XmlUtils.parseXml(actionXml);
-                        cleanupOutputEvents(eAction);
-                    }
-                    if (refresh) {
-                        refreshAction(coordJob, coordAction);
+                Map<String, Context> uriHandlerContextMap = new 
HashMap<String, Context>();
+                Configuration coordJobConf = null;
+                try {
+                    coordJobConf = new XConfiguration(new 
StringReader(coordJob.getConf()));
+                }
+                catch (IOException e) {
+                    throw new CommandException(ErrorCode.E0907, "failed to 
read coord job conf to clean up output data");
+                }
+                try {
+                    for (CoordinatorActionBean coordAction : coordActions) {
+                        String actionXml = coordAction.getActionXml();
+                        if (!noCleanup) {
+                            Element eAction = XmlUtils.parseXml(actionXml);
+                            cleanupOutputEvents(eAction, coordJobConf, 
uriHandlerContextMap);
+                        }
+                        if (refresh) {
+                            refreshAction(coordJob, coordAction);
+                        }
+                        updateAction(coordJob, coordAction);
+                        if (SLAService.isEnabled()) {
+                            
SLAOperations.updateRegistrationEvent(coordAction.getId());
+                        }
+                        queue(new 
CoordActionNotificationXCommand(coordAction), 100);
+                        queue(new 
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 
100);
+                        if (coordAction.getPushMissingDependencies() != null) {
+                            queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
+                        }
                     }
-                    updateAction(coordJob, coordAction);
-                    if (SLAService.isEnabled()) {
-                        
SLAOperations.updateRegistrationEvent(coordAction.getId());
+                }
+                finally {
+                    Iterator<Entry<String, Context>> itr = 
uriHandlerContextMap.entrySet().iterator();
+                    while (itr.hasNext()) {
+                        Entry<String, Context> entry = itr.next();
+                        entry.getValue().destroy();
+                        itr.remove();
                     }
-                    queue(new CoordActionNotificationXCommand(coordAction), 
100);
-                    queue(new 
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 
100);
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
index 566d40d..1bbf37d 100644
--- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
@@ -31,7 +31,6 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.api.ConnectionFailureException;
 import org.apache.hive.hcatalog.api.HCatClient;
@@ -279,8 +278,9 @@ public class HCatURIHandler implements URIHandler {
                     ugi.addToken(token);
                 }
                 finally {
-                    if (tokenClient != null)
+                    if (tokenClient != null) {
                         tokenClient.close();
+                    }
                 }
             }
             XLog.getLog(HCatURIHandler.class).info(
@@ -415,10 +415,18 @@ public class HCatURIHandler implements URIHandler {
         @Override
         public void destroy() {
             try {
-                hcatClient.close();
+                if (delegationToken != null && !delegationToken.isEmpty()) {
+                    hcatClient.cancelDelegationToken(delegationToken);
+                }
                 delegationToken = null;
             }
             catch (Exception ignore) {
+                XLog.getLog(HCatContext.class).warn("Error cancelling 
delegation token", ignore);
+            }
+            try {
+                hcatClient.close();
+            }
+            catch (Exception ignore) {
                 XLog.getLog(HCatContext.class).warn("Error closing hcat 
client", ignore);
             }
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/39e2ed5d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index bf84976..d533d78 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2358 Coord rerun cleanup should reuse hcat connections (rohini)
 OOZIE-2356 Add a way to enable/disable credentials in a workflow (rkanter)
 OOZIE-2355 Hive2 Action doesn't pass along oozie configs to jobconf (rkanter)
 OOZIE-2318 Provide better solution for specifying SSL truststore to Oozie 
Client (rkanter)

Reply via email to