Repository: oozie
Updated Branches:
  refs/heads/master 3cc71c018 -> 01cb4d555


OOZIE-1985 support dropping hcat dataset in coord rerun with cleanup option 
(ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/01cb4d55
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/01cb4d55
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/01cb4d55

Branch: refs/heads/master
Commit: 01cb4d55538ced23ea79744403f22fb5c9811c17
Parents: 3cc71c0
Author: egashira <[email protected]>
Authored: Thu Nov 20 09:41:44 2014 -0800
Committer: egashira <[email protected]>
Committed: Thu Nov 20 09:41:44 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/oozie/ErrorCode.java   |   1 +
 .../oozie/command/coord/CoordRerunXCommand.java |  60 +++++--
 .../apache/oozie/coord/CoordELFunctions.java    |   4 +-
 .../apache/oozie/dependency/FSURIHandler.java   |  33 +++-
 .../apache/oozie/dependency/HCatURIHandler.java | 173 ++++++++++++++++---
 .../org/apache/oozie/dependency/URIHandler.java |  23 ++-
 .../command/coord/TestCoordRerunXCommand.java   | 138 ++++++++++++++-
 .../oozie/dependency/TestHCatURIHandler.java    |   7 +
 release-log.txt                                 |   1 +
 9 files changed, 387 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 4afeb6c..38bc790 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -182,6 +182,7 @@ public enum ErrorCode {
     E0904(XLog.STD, "Scheme [{0}] not supported in uri [{1}]"),
     E0905(XLog.STD, "Scheme not present in uri [{0}]"),
     E0906(XLog.STD, "URI parsing error : {0}"),
+    E0907(XLog.STD, "Failed to delete uri : {0}"),
 
     E1001(XLog.STD, "Could not read the coordinator job definition, {0}"),
     E1002(XLog.STD, "Invalid coordinator application URI [{0}], {1}"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
index 098eb4b..2eefdb8 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
@@ -20,18 +20,21 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorActionInfo;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.XException;
-import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.hadoop.FsActionExecutor;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
@@ -43,6 +46,9 @@ import org.apache.oozie.command.RerunTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.CoordUtils;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandler.Context;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
@@ -50,6 +56,8 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.sla.SLAOperations;
 import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.util.InstrumentUtils;
@@ -128,7 +136,8 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
      * @param group group name
      */
     @SuppressWarnings("unchecked")
-    private void cleanupOutputEvents(Element eAction, String user, String 
group) {
+    private void cleanupOutputEvents(Element eAction, String user, String 
group, CoordinatorAction action)
+            throws CommandException {
         Element outputList = eAction.getChild("output-events", 
eAction.getNamespace());
         if (outputList != null) {
             for (Element data : (List<Element>) 
outputList.getChildren("data-out", eAction.getNamespace())) {
@@ -136,15 +145,40 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
                     String uris = data.getChild("uris", 
data.getNamespace()).getTextTrim();
                     if (uris != null) {
                         String[] uriArr = 
uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
-                        FsActionExecutor fsAe = new FsActionExecutor();
-                        for (String uri : uriArr) {
-                            Path path = new Path(uri);
-                            try {
-                                fsAe.delete(user, group, path);
-                                LOG.debug("Cleanup the output dir " + path);
+                        Configuration actionConf = null;
+                        try {
+                            actionConf = new XConfiguration(new 
StringReader(coordJob.getConf()));
+                        }
+                        catch (IOException e) {
+                            throw new CommandException(ErrorCode.E0907,
+                                    "failed to read coord job conf to clean up 
output data");
+                        }
+                        HashMap<String, Context> contextMap = new 
HashMap<String, Context>();
+                        try {
+                            for (String uriStr : uriArr) {
+                                URI uri = new URI(uriStr);
+                                URIHandler handler = 
Services.get().get(URIHandlerService.class).getURIHandler(uri);
+                                String schemeWithAuthority = uri.getScheme() + 
"://" + uri.getAuthority();
+                                if 
(!contextMap.containsKey(schemeWithAuthority)) {
+                                    Context context = handler.getContext(uri, 
actionConf, user, false);
+                                    contextMap.put(schemeWithAuthority, 
context);
+                                }
+                                handler.delete(uri, 
contextMap.get(schemeWithAuthority));
+                                LOG.info("Cleanup the output data " + 
uri.toString());
                             }
-                            catch (ActionExecutorException ae) {
-                                LOG.warn("Failed to cleanup the output dir " + 
uri, ae);
+                        }
+                        catch (URISyntaxException e) {
+                            throw new CommandException(ErrorCode.E0907, 
e.getMessage());
+                        }
+                        catch (URIHandlerException e) {
+                            throw new CommandException(ErrorCode.E0907, 
e.getMessage());
+                        }
+                        finally {
+                            Iterator<Entry<String, Context>> itr = 
contextMap.entrySet().iterator();
+                            while (itr.hasNext()) {
+                                Entry<String, Context> entry = itr.next();
+                                entry.getValue().destroy();
+                                itr.remove();
                             }
                         }
                     }
@@ -312,7 +346,7 @@ public class CoordRerunXCommand extends 
RerunTransitionXCommand<CoordinatorActio
                     String actionXml = coordAction.getActionXml();
                     if (!noCleanup) {
                         Element eAction = XmlUtils.parseXml(actionXml);
-                        cleanupOutputEvents(eAction, coordJob.getUser(), 
coordJob.getGroup());
+                        cleanupOutputEvents(eAction, coordJob.getUser(), 
coordJob.getGroup(), coordAction);
                     }
                     if (refresh) {
                         refreshAction(coordJob, coordAction);

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java 
b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
index 3bb191e..7f59186 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
@@ -317,7 +317,7 @@ public class CoordELFunctions {
                     if (uriHandler == null) {
                         URI uri = new URI(uriPath);
                         uriHandler = uriService.getURIHandler(uri);
-                        uriContext = uriHandler.getContext(uri, conf, user);
+                        uriContext = uriHandler.getContext(uri, conf, user, 
true);
                     }
                     String uriWithDoneFlag = 
uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
                     if (uriHandler.exists(new URI(uriWithDoneFlag), 
uriContext)) {
@@ -1075,7 +1075,7 @@ public class CoordELFunctions {
                     if (uriHandler == null) {
                         URI uri = new URI(uriPath);
                         uriHandler = uriService.getURIHandler(uri);
-                        uriContext = uriHandler.getContext(uri, conf, user);
+                        uriContext = uriHandler.getContext(uri, conf, user, 
true);
                     }
                     String uriWithDoneFlag = 
uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
                     if (uriHandler.exists(new URI(uriWithDoneFlag), 
uriContext)) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
index a8f548a..7c1aadf 100644
--- a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
@@ -78,7 +78,7 @@ public class FSURIHandler implements URIHandler {
     }
 
     @Override
-    public Context getContext(URI uri, Configuration conf, String user) throws 
URIHandlerException {
+    public Context getContext(URI uri, Configuration conf, String user, 
boolean readOnly) throws URIHandlerException {
         FileSystem fs = getFileSystem(uri, conf, user);
         return new FSContext(conf, user, fs);
     }
@@ -122,6 +122,37 @@ public class FSURIHandler implements URIHandler {
 
     }
 
+    @Override
+    public void delete(URI uri, Context context) throws URIHandlerException {
+        FileSystem fs = ((FSContext) context).getFileSystem();
+        Path path = new Path(uri);
+        try {
+            if (fs.exists(path)) {
+                if (!fs.delete(path, true)) {
+                    throw new URIHandlerException(ErrorCode.E0907, 
path.toString());
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new URIHandlerException(ErrorCode.E0907, path.toString());
+        }
+    }
+
+    @Override
+    public void delete(URI uri, Configuration conf, String user) throws 
URIHandlerException {
+        Path path = new Path(uri);
+        FileSystem fs = getFileSystem(uri, conf, user);
+        try{
+            if (fs.exists(path)) {
+                if (!fs.delete(path, true)) {
+                    throw new URIHandlerException(ErrorCode.E0907, 
path.toString());
+                }
+            }
+        } catch (IOException e){
+            throw new URIHandlerException(ErrorCode.E0907, path.toString());
+        }
+    }
+
     private Path getNormalizedPath(URI uri) {
         // Normalizes uri path replacing // with / in the path which users 
specify by mistake
         return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
index 629033b..0e690a0 100644
--- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
@@ -21,6 +21,7 @@ package org.apache.oozie.dependency;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.api.ConnectionFailureException;
 import org.apache.hive.hcatalog.api.HCatClient;
@@ -106,7 +108,7 @@ public class HCatURIHandler implements URIHandler {
         }
         HCatAccessorService hcatService = 
Services.get().get(HCatAccessorService.class);
         if (!hcatService.isRegisteredForNotification(hcatURI)) {
-            HCatClient client = getHCatClient(uri, conf, user);
+            HCatClient client = getHCatClient(uri, conf);
             try {
                 String topic = client.getMessageBusTopicName(hcatURI.getDb(), 
hcatURI.getTable());
                 if (topic == null) {
@@ -118,7 +120,7 @@ public class HCatURIHandler implements URIHandler {
                 throw new HCatAccessorException(ErrorCode.E1501, e);
             }
             finally {
-                closeQuietly(client, true);
+                closeQuietly(client, null, true);
             }
         }
         PartitionDependencyManagerService pdmService = 
Services.get().get(PartitionDependencyManagerService.class);
@@ -139,9 +141,20 @@ public class HCatURIHandler implements URIHandler {
     }
 
     @Override
-    public Context getContext(URI uri, Configuration conf, String user) throws 
URIHandlerException {
-        HCatClient client = getHCatClient(uri, conf, user);
-        return new HCatContext(conf, user, client);
+    public Context getContext(URI uri, Configuration conf, String user, 
boolean readOnly)
+            throws URIHandlerException {
+        HCatContext context = null;
+        //read operations are allowed for any user in HCat and so accessing as 
Oozie server itself
+        //For write operations, perform doAs as user
+        if (readOnly) {
+            HCatClient client = getHCatClient(uri, conf);
+            context = new HCatContext(conf, user, client);
+        }
+        else {
+            HCatClientWithToken client = getHCatClient(uri, conf, user);
+            context = new HCatContext(conf, user, client);
+        }
+        return context;
     }
 
     @Override
@@ -152,11 +165,45 @@ public class HCatURIHandler implements URIHandler {
 
     @Override
     public boolean exists(URI uri, Configuration conf, String user) throws 
URIHandlerException {
-        HCatClient client = getHCatClient(uri, conf, user);
+        HCatClient client = getHCatClient(uri, conf);
         return exists(uri, client, true);
     }
 
     @Override
+    public void delete(URI uri, Context context) throws URIHandlerException {
+        HCatClient client = ((HCatContext) context).getHCatClient();
+        try {
+            HCatURI hcatUri  = new HCatURI(uri);
+            client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), 
hcatUri.getPartitionMap(), true);
+        }
+        catch (URISyntaxException e) {
+            throw new HCatAccessorException(ErrorCode.E1501, e);
+        }
+        catch (HCatException e) {
+            throw new HCatAccessorException(ErrorCode.E1501, e);
+        }
+    }
+
+    @Override
+    public void delete(URI uri, Configuration conf, String user) throws 
URIHandlerException {
+        HCatClientWithToken client = null;
+        try {
+            HCatURI hcatUri = new HCatURI(uri);
+            client = getHCatClient(uri, conf, user);
+            client.getHCatClient().dropPartitions(hcatUri.getDb(), 
hcatUri.getTable(), hcatUri.getPartitionMap(), true);
+        }
+        catch (URISyntaxException e){
+            throw new HCatAccessorException(ErrorCode.E1501, e);
+        }
+        catch (HCatException e) {
+            throw new HCatAccessorException(ErrorCode.E1501, e);
+        }
+        finally {
+            closeQuietly(client.getHCatClient(), 
client.getDelegationToken(),true);
+        }
+    }
+
+    @Override
     public String getURIWithDoneFlag(String uri, String doneFlag) throws 
URIHandlerException {
         return uri;
     }
@@ -177,34 +224,25 @@ public class HCatURIHandler implements URIHandler {
 
     }
 
-    private HCatClient getHCatClient(URI uri, Configuration conf, String user) 
throws HCatAccessorException {
+    private HiveConf getHiveConf(URI uri, Configuration conf){
         HCatAccessorService hcatService = 
Services.get().get(HCatAccessorService.class);
         if (hcatService.getHCatConf() != null) {
             conf = hcatService.getHCatConf();
         }
-        final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+        HiveConf hiveConf = new HiveConf(conf, this.getClass());
         String serverURI = getMetastoreConnectURI(uri);
         if (!serverURI.equals("")) {
             hiveConf.set("hive.metastore.local", "false");
         }
         hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
-        try {
-            XLog.getLog(HCatURIHandler.class).info(
-                    "Creating HCatClient for user [{0}] login_user [{1}] and 
server [{2}] ", user,
-                    UserGroupInformation.getLoginUser(), serverURI);
-
-            // HiveMetastoreClient (hive 0.9) currently does not work if UGI 
has doAs
-            // We are good to connect as the oozie user since listPartitions 
does not require
-            // authorization
-            /*
-            UserGroupInformation ugi = ugiService.getProxyUser(user);
-            return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
-                public HCatClient run() throws Exception {
-                    return HCatClient.create(hiveConf);
-                }
-            });
-            */
+        return hiveConf;
+    }
 
+    private HCatClient getHCatClient(URI uri, Configuration conf) throws 
HCatAccessorException {
+        HiveConf hiveConf = getHiveConf(uri, conf);
+        try {
+            XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for 
login_user [{0}] and server [{1}] ",
+                    UserGroupInformation.getLoginUser(), 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
             return HCatClient.create(hiveConf);
         }
         catch (HCatException e) {
@@ -213,7 +251,52 @@ public class HCatURIHandler implements URIHandler {
         catch (IOException e) {
             throw new HCatAccessorException(ErrorCode.E1501, e);
         }
+    }
 
+    private HCatClientWithToken getHCatClient(URI uri, Configuration conf, 
String user)
+            throws HCatAccessorException {
+        final HiveConf hiveConf = getHiveConf(uri, conf);
+        String delegationToken = null;
+        try {
+            // Get UGI to doAs() as the specified user
+            UserGroupInformation ugi = 
UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+            // Define the label for the Delegation Token for the HCat instance.
+            hiveConf.set("hive.metastore.token.signature", 
"HCatTokenSignature");
+            if 
(hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, 
false)) {
+                HCatClient tokenClient = null;
+                try {
+                    // Retrieve Delegation token for HCatalog
+                    tokenClient = HCatClient.create(hiveConf);
+                    delegationToken = tokenClient.getDelegationToken(user, 
UserGroupInformation.getLoginUser()
+                            .getUserName());
+                    // Store Delegation token in the UGI
+                    ShimLoader.getHadoopShims().setTokenStr(ugi, 
delegationToken,
+                            hiveConf.get("hive.metastore.token.signature"));
+                }
+                finally {
+                    if (tokenClient != null)
+                        tokenClient.close();
+                }
+            }
+            XLog.getLog(HCatURIHandler.class).info(
+                    "Creating HCatClient for user [{0}] login_user [{1}] and 
server [{2}] ", user,
+                    UserGroupInformation.getLoginUser(), 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+            HCatClient hcatClient = ugi.doAs(new 
PrivilegedExceptionAction<HCatClient>() {
+                @Override
+                public HCatClient run() throws Exception {
+                    HCatClient client = HCatClient.create(hiveConf);
+                    return client;
+                }
+            });
+            HCatClientWithToken clientWithToken = new 
HCatClientWithToken(hcatClient, delegationToken);
+            return clientWithToken;
+        }
+        catch (IOException e) {
+            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
+        }
+        catch (Exception e) {
+            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
+        }
     }
 
     private String getMetastoreConnectURI(URI uri) {
@@ -247,13 +330,16 @@ public class HCatURIHandler implements URIHandler {
             throw new HCatAccessorException(ErrorCode.E0902, e);
         }
         finally {
-            closeQuietly(client, closeClient);
+            closeQuietly(client, null, closeClient);
         }
     }
 
-    private void closeQuietly(HCatClient client, boolean close) {
+    private void closeQuietly(HCatClient client, String delegationToken, 
boolean close) {
         if (close && client != null) {
             try {
+                if(delegationToken != null && !delegationToken.isEmpty()) {
+                    client.cancelDelegationToken(delegationToken);
+                }
                 client.close();
             }
             catch (Exception ignore) {
@@ -262,9 +348,28 @@ public class HCatURIHandler implements URIHandler {
         }
     }
 
+    class HCatClientWithToken {
+        private HCatClient hcatClient;
+        private String token;
+
+        public HCatClientWithToken(HCatClient client, String delegationToken) {
+            this.hcatClient = client;
+            this.token = delegationToken;
+        }
+
+        public HCatClient getHCatClient() {
+            return this.hcatClient;
+        }
+
+        public String getDelegationToken() {
+            return this.token;
+        }
+    }
+
     static class HCatContext extends Context {
 
         private HCatClient hcatClient;
+        private String delegationToken;
 
         /**
          * Create a HCatContext that can be used to access a hcat URI
@@ -278,6 +383,12 @@ public class HCatURIHandler implements URIHandler {
             this.hcatClient = hcatClient;
         }
 
+        public HCatContext(Configuration conf, String user, 
HCatClientWithToken hcatClient) {
+            super(conf, user);
+            this.hcatClient = hcatClient.getHCatClient();
+            this.delegationToken = hcatClient.getDelegationToken();
+        }
+
         /**
          * Get the HCatClient to talk to hcatalog server
          *
@@ -287,10 +398,20 @@ public class HCatURIHandler implements URIHandler {
             return hcatClient;
         }
 
+        /**
+         * Get the Delegation token to access HCat
+         *
+         * @return delegationToken
+         */
+        public String getDelegationToken() {
+            return delegationToken;
+        }
+
         @Override
         public void destroy() {
             try {
                 hcatClient.close();
+                delegationToken = null;
             }
             catch (Exception ignore) {
                 XLog.getLog(HCatContext.class).warn("Error closing hcat 
client", ignore);

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
index 7280902..6e54d4b 100644
--- a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
@@ -104,12 +104,12 @@ public interface URIHandler {
      * @param uri URI which identifies the scheme and host
      * @param conf Configuration to access the URI
      * @param user name of the user the URI should be accessed as
-     *
+     * @param readOnly indicate if operation is read-only
      * @return Context to access URIs with same scheme and host
      *
      * @throws URIHandlerException
      */
-    public Context getContext(URI uri, Configuration conf, String user) throws 
URIHandlerException;
+    public Context getContext(URI uri, Configuration conf, String user, 
boolean readOnly) throws URIHandlerException;
 
     /**
      * Check if the dependency identified by the URI is available
@@ -140,6 +140,25 @@ public interface URIHandler {
     public boolean exists(URI uri, Configuration conf, String user) throws 
URIHandlerException;
 
     /**
+     * Delete a URI
+     *
+     * @param uri URI
+     * @param context Context to access the URI
+     * @throws URIHandlerException
+     */
+    public void delete(URI uri, Context context) throws URIHandlerException;
+
+    /**
+     * Delete a URI
+     *
+     * @param uri URI
+     * @param conf Configuration to access the URI
+     * @param user name of the user the URI should be accessed as
+     * @throws URIHandlerException
+     */
+    public void delete(URI uri, Configuration conf, String user) throws 
URIHandlerException;
+
+    /**
      * Get the URI based on the done flag
      *
      * @param uri URI of the dependency

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
index ac023ca..2730aa6 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
@@ -18,6 +18,19 @@
 
 package org.apache.oozie.command.coord;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
@@ -29,10 +42,25 @@ import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
-import org.apache.oozie.executor.jpa.*;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.*;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.SchemaService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService;
+import org.apache.oozie.service.StoreService;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.store.CoordinatorStore;
 import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.XDataTestCase;
@@ -43,12 +71,6 @@ import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 
-import java.io.*;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-
 public class TestCoordRerunXCommand extends XDataTestCase {
     private Services services;
 
@@ -620,6 +642,69 @@ public class TestCoordRerunXCommand extends XDataTestCase {
     }
 
     /**
+     * Test : rerun with refresh option when input dependency is hcat partition
+     *
+     * @throws Exception
+     */
+    public void testCoordRerunCleanupForHCat() throws Exception {
+
+        services = super.setupServicesForHCatalog();
+        services.init();
+
+        final String jobId = "0000000-" + new Date().getTime() + 
"-testCoordRerun-C";
+        final int actionNum = 1;
+        final String actionId = jobId + "@" + actionNum;
+        CoordinatorStore store = 
Services.get().get(StoreService.class).getStore(CoordinatorStore.class);
+        store.beginTrx();
+        try {
+            addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED);
+            addRecordToActionTable(jobId, actionNum, actionId, store, 
CoordinatorAction.Status.SUCCEEDED,
+                    "coord-rerun-action1.xml", true);
+            store.commitTrx();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail("Could not update db.");
+        }
+        finally {
+            store.closeTrx();
+        }
+
+        String db = "mydb";
+        String table = "mytable";
+        String server = getHCatalogServer().getMetastoreAuthority();
+        String newHCatDependency = "hcat://" + server + "/" + db + "/" + table 
+ "/ds=2009121411;region=usa";
+
+        dropTable(db, table, true);
+        dropDatabase(db, true);
+        createDatabase(db);
+        createTable(db, table, "ds,region");
+        addPartition(db, table, "ds=2009121411;region=usa");
+
+        // before cleanup
+        Configuration conf = new Configuration();
+        URIHandler handler = 
services.get(URIHandlerService.class).getURIHandler(newHCatDependency);
+        assertTrue(handler.exists(new URI(newHCatDependency), conf, 
getTestUser()));
+
+        final OozieClient coordClient = LocalOozie.getCoordClient();
+        coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, 
Integer.toString(actionNum), false, false);
+
+        CoordinatorActionBean action2 = 
CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, 
actionId);
+        assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                CoordinatorAction bean = 
coordClient.getCoordActionInfo(actionId);
+                return (bean.getStatus() == CoordinatorAction.Status.WAITING 
|| bean.getStatus() == CoordinatorAction.Status.READY);
+            }
+        });
+
+        // after cleanup
+        assertFalse(handler.exists(new URI(newHCatDependency), conf, 
getTestUser()));
+    }
+
+    /**
      * Test : rerun <jobId> -action 1 with no output-event
      *
      * @throws Exception
@@ -1066,8 +1151,19 @@ public class TestCoordRerunXCommand extends 
XDataTestCase {
 
     private void addRecordToActionTable(String jobId, int actionNum, String 
actionId, CoordinatorStore store,
             CoordinatorAction.Status status, String resourceXmlName) throws 
StoreException, IOException {
+        addRecordToActionTable(jobId, actionNum, actionId, store, status, 
resourceXmlName, false);
+    }
+
+    private void addRecordToActionTable(String jobId, int actionNum, String 
actionId, CoordinatorStore store,
+            CoordinatorAction.Status status, String resourceXmlName, boolean 
isHCatDep) throws StoreException,
+            IOException {
         Path appPath = new Path(getFsTestCaseDir(), "coord");
-        String actionXml = getCoordActionXml(appPath, resourceXmlName);
+        String actionXml = null;
+        if(isHCatDep != true) {
+            actionXml = getCoordActionXml(appPath, resourceXmlName);
+        } else {
+            actionXml = getCoordActionXmlForHCat(appPath, resourceXmlName);
+        }
         String actionNomialTime = getActionNomialTime(actionXml);
 
         CoordinatorActionBean action = new CoordinatorActionBean();
@@ -1185,6 +1281,30 @@ public class TestCoordRerunXCommand extends 
XDataTestCase {
         }
     }
 
+    protected String getCoordActionXmlForHCat(Path appPath, String 
resourceXmlName) {
+        String hcatServer = getHCatalogServer().getMetastoreAuthority();
+        String inputTemplate = "hcat://" + hcatServer + 
"/mydb/mytable/ds=${YEAR}${MONTH}${DAY}${HOUR};region=usa";
+        inputTemplate = Matcher.quoteReplacement(inputTemplate);
+        String outputTemplate = "hcat://" + hcatServer + 
"/mydb/mytable/ds=${YEAR}${MONTH}${DAY}${HOUR};region=usa";
+        outputTemplate = Matcher.quoteReplacement(outputTemplate);
+        String inputDir = "hcat://" + hcatServer + 
"/mydb/mytable/ds=2010070501;region=usa";
+        inputDir = Matcher.quoteReplacement(inputDir);
+        String outputDir = "hcat://" + hcatServer + 
"/mydb/mytable/ds=2009121411;region=usa";
+        outputDir = Matcher.quoteReplacement(outputDir);
+        try {
+            Reader reader = IOUtils.getResourceAsReader(resourceXmlName, -1);
+            String appXml = IOUtils.getReaderAsString(reader, -1);
+            appXml = appXml.replaceAll("#inputTemplate", inputTemplate);
+            appXml = appXml.replaceAll("#outputTemplate", outputTemplate);
+            appXml = appXml.replaceAll("#inputDir", inputDir);
+            appXml = appXml.replaceAll("#outputDir", outputDir);
+            return appXml;
+        }
+        catch (IOException ioe) {
+            throw new RuntimeException(XLog.format("Could not get " + 
resourceXmlName, ioe));
+        }
+    }
+
     private String getActionNomialTime(String actionXml) {
         Element eAction;
         try {

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java 
b/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
index 31a68bf..a49eba5 100644
--- a/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
+++ b/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
@@ -95,6 +95,13 @@ public class TestHCatURIHandler extends XHCatTestCase {
         hcatURI = getHCatURI(db, table, "month=02;dt=02");
         assertFalse(handler.exists(hcatURI, conf, getTestUser()));
 
+        addPartition(db, table, "year=2012;month=12;dt=04;country=us");
+
+        hcatURI = getHCatURI(db, table, "country=us;year=2012;month=12;dt=04");
+        assertTrue(handler.exists(hcatURI, conf, getTestUser()));
+        ((HCatURIHandler)handler).delete(hcatURI, conf, getTestUser());
+        assertFalse(handler.exists(hcatURI, conf, getTestUser()));
+
         dropTestTable();
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 695e1dd..86398ee 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1985 support dropping hcat dataset in coord rerun with cleanup option 
(ryota)
 OOZIE-2053 Change old HCatalog API (ryota)
 OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize 
(puru)
 OOZIE-2063 Cron syntax creates duplicate actions (bzhang)

Reply via email to