Repository: oozie Updated Branches: refs/heads/master 3cc71c018 -> 01cb4d555
OOZIE-1985 support dropping hcat dataset in coord rerun with cleanup option (ryota) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/01cb4d55 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/01cb4d55 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/01cb4d55 Branch: refs/heads/master Commit: 01cb4d55538ced23ea79744403f22fb5c9811c17 Parents: 3cc71c0 Author: egashira <[email protected]> Authored: Thu Nov 20 09:41:44 2014 -0800 Committer: egashira <[email protected]> Committed: Thu Nov 20 09:41:44 2014 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 1 + .../oozie/command/coord/CoordRerunXCommand.java | 60 +++++-- .../apache/oozie/coord/CoordELFunctions.java | 4 +- .../apache/oozie/dependency/FSURIHandler.java | 33 +++- .../apache/oozie/dependency/HCatURIHandler.java | 173 ++++++++++++++++--- .../org/apache/oozie/dependency/URIHandler.java | 23 ++- .../command/coord/TestCoordRerunXCommand.java | 138 ++++++++++++++- .../oozie/dependency/TestHCatURIHandler.java | 7 + release-log.txt | 1 + 9 files changed, 387 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 4afeb6c..38bc790 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -182,6 +182,7 @@ public enum ErrorCode { E0904(XLog.STD, "Scheme [{0}] not supported in uri [{1}]"), E0905(XLog.STD, "Scheme not present in uri [{0}]"), E0906(XLog.STD, "URI parsing error : {0}"), + E0907(XLog.STD, "Failed to delete uri : {0}"), E1001(XLog.STD, "Could not read the coordinator job definition, {0}"), E1002(XLog.STD, "Invalid coordinator application URI [{0}], {1}"), http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java index 098eb4b..2eefdb8 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java @@ -20,18 +20,21 @@ package org.apache.oozie.command.coord; import java.io.IOException; import java.io.StringReader; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorActionInfo; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.SLAEventBean; import org.apache.oozie.XException; -import org.apache.oozie.action.ActionExecutorException; -import org.apache.oozie.action.hadoop.FsActionExecutor; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.Job; @@ -43,6 +46,9 @@ import org.apache.oozie.command.RerunTransitionXCommand; import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; import org.apache.oozie.coord.CoordELFunctions; import org.apache.oozie.coord.CoordUtils; +import org.apache.oozie.dependency.URIHandler; +import org.apache.oozie.dependency.URIHandler.Context; +import org.apache.oozie.dependency.URIHandlerException; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; @@ -50,6 +56,8 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.EventHandlerService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.sla.SLAOperations; import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.util.InstrumentUtils; @@ -128,7 +136,8 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio * @param group group name */ @SuppressWarnings("unchecked") - private void cleanupOutputEvents(Element eAction, String user, String group) { + private void cleanupOutputEvents(Element eAction, String user, String group, CoordinatorAction action) + throws CommandException { Element outputList = eAction.getChild("output-events", eAction.getNamespace()); if (outputList != null) { for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { @@ -136,15 +145,40 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); if (uris != null) { String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); - FsActionExecutor fsAe = new FsActionExecutor(); - for (String uri : uriArr) { - Path path = new Path(uri); - try { - fsAe.delete(user, group, path); - LOG.debug("Cleanup the output dir " + path); + Configuration actionConf = null; + try { + actionConf = new XConfiguration(new StringReader(coordJob.getConf())); + } + catch (IOException e) { + throw new CommandException(ErrorCode.E0907, + "failed to read coord job conf to clean up output data"); + } + HashMap<String, Context> contextMap = new HashMap<String, Context>(); + try { + for (String uriStr : uriArr) { + URI uri = new URI(uriStr); + URIHandler handler = Services.get().get(URIHandlerService.class).getURIHandler(uri); + String schemeWithAuthority = uri.getScheme() + "://" + uri.getAuthority(); + if (!contextMap.containsKey(schemeWithAuthority)) { + Context context = handler.getContext(uri, actionConf, user, false); + contextMap.put(schemeWithAuthority, context); + } + handler.delete(uri, contextMap.get(schemeWithAuthority)); + LOG.info("Cleanup the output data " + uri.toString()); } - catch (ActionExecutorException ae) { - LOG.warn("Failed to cleanup the output dir " + uri, ae); + } + catch (URISyntaxException e) { + throw new CommandException(ErrorCode.E0907, e.getMessage()); + } + catch (URIHandlerException e) { + throw new CommandException(ErrorCode.E0907, e.getMessage()); + } + finally { + Iterator<Entry<String, Context>> itr = contextMap.entrySet().iterator(); + while (itr.hasNext()) { + Entry<String, Context> entry = itr.next(); + entry.getValue().destroy(); + itr.remove(); } } } @@ -312,7 +346,7 @@ public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActio String actionXml = coordAction.getActionXml(); if (!noCleanup) { Element eAction = XmlUtils.parseXml(actionXml); - cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); + cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup(), coordAction); } if (refresh) { refreshAction(coordJob, coordAction); http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java index 3bb191e..7f59186 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java @@ -317,7 +317,7 @@ public class CoordELFunctions { if (uriHandler == null) { URI uri = new URI(uriPath); uriHandler = uriService.getURIHandler(uri); - uriContext = uriHandler.getContext(uri, conf, user); + uriContext = uriHandler.getContext(uri, conf, user, true); } String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { @@ -1075,7 +1075,7 @@ public class CoordELFunctions { if (uriHandler == null) { URI uri = new URI(uriPath); uriHandler = uriService.getURIHandler(uri); - uriContext = uriHandler.getContext(uri, conf, user); + uriContext = uriHandler.getContext(uri, conf, user, true); } String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java index a8f548a..7c1aadf 100644 --- a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java @@ -78,7 +78,7 @@ public class FSURIHandler implements URIHandler { } @Override - public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException { + public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException { FileSystem fs = getFileSystem(uri, conf, user); return new FSContext(conf, user, fs); } @@ -122,6 +122,37 @@ public class FSURIHandler implements URIHandler { } + @Override + public void delete(URI uri, Context context) throws URIHandlerException { + FileSystem fs = ((FSContext) context).getFileSystem(); + Path path = new Path(uri); + try { + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new URIHandlerException(ErrorCode.E0907, path.toString()); + } + } + } + catch (IOException e) { + throw new URIHandlerException(ErrorCode.E0907, path.toString()); + } + } + + @Override + public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { + Path path = new Path(uri); + FileSystem fs = getFileSystem(uri, conf, user); + try{ + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new URIHandlerException(ErrorCode.E0907, path.toString()); + } + } + } catch (IOException e){ + throw new URIHandlerException(ErrorCode.E0907, path.toString()); + } + } + private Path getNormalizedPath(URI uri) { // Normalizes uri path replacing // with / in the path which users specify by mistake return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java index 629033b..0e690a0 100644 --- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java @@ -21,6 +21,7 @@ package org.apache.oozie.dependency; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -30,6 +31,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.api.ConnectionFailureException; import org.apache.hive.hcatalog.api.HCatClient; @@ -106,7 +108,7 @@ public class HCatURIHandler implements URIHandler { } HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); if (!hcatService.isRegisteredForNotification(hcatURI)) { - HCatClient client = getHCatClient(uri, conf, user); + HCatClient client = getHCatClient(uri, conf); try { String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); if (topic == null) { @@ -118,7 +120,7 @@ public class HCatURIHandler implements URIHandler { throw new HCatAccessorException(ErrorCode.E1501, e); } finally { - closeQuietly(client, true); + closeQuietly(client, null, true); } } PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); @@ -139,9 +141,20 @@ public class HCatURIHandler implements URIHandler { } @Override - public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException { - HCatClient client = getHCatClient(uri, conf, user); - return new HCatContext(conf, user, client); + public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) + throws URIHandlerException { + HCatContext context = null; + //read operations are allowed for any user in HCat and so accessing as Oozie server itself + //For write operations, perform doAs as user + if (readOnly) { + HCatClient client = getHCatClient(uri, conf); + context = new HCatContext(conf, user, client); + } + else { + HCatClientWithToken client = getHCatClient(uri, conf, user); + context = new HCatContext(conf, user, client); + } + return context; } @Override @@ -152,11 +165,45 @@ public class HCatURIHandler implements URIHandler { @Override public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { - HCatClient client = getHCatClient(uri, conf, user); + HCatClient client = getHCatClient(uri, conf); return exists(uri, client, true); } @Override + public void delete(URI uri, Context context) throws URIHandlerException { + HCatClient client = ((HCatContext) context).getHCatClient(); + try { + HCatURI hcatUri = new HCatURI(uri); + client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); + } + catch (URISyntaxException e) { + throw new HCatAccessorException(ErrorCode.E1501, e); + } + catch (HCatException e) { + throw new HCatAccessorException(ErrorCode.E1501, e); + } + } + + @Override + public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { + HCatClientWithToken client = null; + try { + HCatURI hcatUri = new HCatURI(uri); + client = getHCatClient(uri, conf, user); + client.getHCatClient().dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); + } + catch (URISyntaxException e){ + throw new HCatAccessorException(ErrorCode.E1501, e); + } + catch (HCatException e) { + throw new HCatAccessorException(ErrorCode.E1501, e); + } + finally { + closeQuietly(client.getHCatClient(), client.getDelegationToken(),true); + } + } + + @Override public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { return uri; } @@ -177,34 +224,25 @@ public class HCatURIHandler implements URIHandler { } - private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException { + private HiveConf getHiveConf(URI uri, Configuration conf){ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); if (hcatService.getHCatConf() != null) { conf = hcatService.getHCatConf(); } - final HiveConf hiveConf = new HiveConf(conf, this.getClass()); + HiveConf hiveConf = new HiveConf(conf, this.getClass()); String serverURI = getMetastoreConnectURI(uri); if (!serverURI.equals("")) { hiveConf.set("hive.metastore.local", "false"); } hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); - try { - XLog.getLog(HCatURIHandler.class).info( - "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, - UserGroupInformation.getLoginUser(), serverURI); - - // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs - // We are good to connect as the oozie user since listPartitions does not require - // authorization - /* - UserGroupInformation ugi = ugiService.getProxyUser(user); - return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { - public HCatClient run() throws Exception { - return HCatClient.create(hiveConf); - } - }); - */ + return hiveConf; + } + private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException { + HiveConf hiveConf = getHiveConf(uri, conf); + try { + XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ", + UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); return HCatClient.create(hiveConf); } catch (HCatException e) { @@ -213,7 +251,52 @@ public class HCatURIHandler implements URIHandler { catch (IOException e) { throw new HCatAccessorException(ErrorCode.E1501, e); } + } + private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user) + throws HCatAccessorException { + final HiveConf hiveConf = getHiveConf(uri, conf); + String delegationToken = null; + try { + // Get UGI to doAs() as the specified user + UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + // Define the label for the Delegation Token for the HCat instance. + hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature"); + if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) { + HCatClient tokenClient = null; + try { + // Retrieve Delegation token for HCatalog + tokenClient = HCatClient.create(hiveConf); + delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser() + .getUserName()); + // Store Delegation token in the UGI + ShimLoader.getHadoopShims().setTokenStr(ugi, delegationToken, + hiveConf.get("hive.metastore.token.signature")); + } + finally { + if (tokenClient != null) + tokenClient.close(); + } + } + XLog.getLog(HCatURIHandler.class).info( + "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, + UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); + HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { + @Override + public HCatClient run() throws Exception { + HCatClient client = HCatClient.create(hiveConf); + return client; + } + }); + HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken); + return clientWithToken; + } + catch (IOException e) { + throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); + } + catch (Exception e) { + throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); + } } private String getMetastoreConnectURI(URI uri) { @@ -247,13 +330,16 @@ public class HCatURIHandler implements URIHandler { throw new HCatAccessorException(ErrorCode.E0902, e); } finally { - closeQuietly(client, closeClient); + closeQuietly(client, null, closeClient); } } - private void closeQuietly(HCatClient client, boolean close) { + private void closeQuietly(HCatClient client, String delegationToken, boolean close) { if (close && client != null) { try { + if(delegationToken != null && !delegationToken.isEmpty()) { + client.cancelDelegationToken(delegationToken); + } client.close(); } catch (Exception ignore) { @@ -262,9 +348,28 @@ public class HCatURIHandler implements URIHandler { } } + class HCatClientWithToken { + private HCatClient hcatClient; + private String token; + + public HCatClientWithToken(HCatClient client, String delegationToken) { + this.hcatClient = client; + this.token = delegationToken; + } + + public HCatClient getHCatClient() { + return this.hcatClient; + } + + public String getDelegationToken() { + return this.token; + } + } + static class HCatContext extends Context { private HCatClient hcatClient; + private String delegationToken; /** * Create a HCatContext that can be used to access a hcat URI @@ -278,6 +383,12 @@ public class HCatURIHandler implements URIHandler { this.hcatClient = hcatClient; } + public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) { + super(conf, user); + this.hcatClient = hcatClient.getHCatClient(); + this.delegationToken = hcatClient.getDelegationToken(); + } + /** * Get the HCatClient to talk to hcatalog server * @@ -287,10 +398,20 @@ public class HCatURIHandler implements URIHandler { return hcatClient; } + /** + * Get the Delegation token to access HCat + * + * @return delegationToken + */ + public String getDelegationToken() { + return delegationToken; + } + @Override public void destroy() { try { hcatClient.close(); + delegationToken = null; } catch (Exception ignore) { XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/main/java/org/apache/oozie/dependency/URIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java index 7280902..6e54d4b 100644 --- a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java @@ -104,12 +104,12 @@ public interface URIHandler { * @param uri URI which identifies the scheme and host * @param conf Configuration to access the URI * @param user name of the user the URI should be accessed as - * + * @param readOnly indicate if operation is read-only * @return Context to access URIs with same scheme and host * * @throws URIHandlerException */ - public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException; + public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException; /** * Check if the dependency identified by the URI is available @@ -140,6 +140,25 @@ public interface URIHandler { public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException; /** + * Delete a URI + * + * @param uri URI + * @param context Context to access the URI + * @throws URIHandlerException + */ + public void delete(URI uri, Context context) throws URIHandlerException; + + /** + * Delete a URI + * + * @param uri URI + * @param conf Configuration to access the URI + * @param user name of the user the URI should be accessed as + * @throws URIHandlerException + */ + public void delete(URI uri, Configuration conf, String user) throws URIHandlerException; + + /** * Get the URI based on the done flag * * @param uri URI of the dependency http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java index ac023ca..2730aa6 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java @@ -18,6 +18,19 @@ package org.apache.oozie.command.coord; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.CoordinatorActionBean; @@ -29,10 +42,25 @@ import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELFunctions; -import org.apache.oozie.executor.jpa.*; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.service.*; +import org.apache.oozie.service.UUIDService; +import org.apache.oozie.dependency.URIHandler; +import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.SchemaService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.StatusTransitService; +import org.apache.oozie.service.StoreService; +import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.store.CoordinatorStore; import org.apache.oozie.store.StoreException; import org.apache.oozie.test.XDataTestCase; @@ -43,12 +71,6 @@ import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.JDOMException; -import java.io.*; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.regex.Matcher; - public class TestCoordRerunXCommand extends XDataTestCase { private Services services; @@ -620,6 +642,69 @@ public class TestCoordRerunXCommand extends XDataTestCase { } /** + * Test : rerun with refresh option when input dependency is hcat partition + * + * @throws Exception + */ + public void testCoordRerunCleanupForHCat() throws Exception { + + services = super.setupServicesForHCatalog(); + services.init(); + + final String jobId = "0000000-" + new Date().getTime() + "-testCoordRerun-C"; + final int actionNum = 1; + final String actionId = jobId + "@" + actionNum; + CoordinatorStore store = Services.get().get(StoreService.class).getStore(CoordinatorStore.class); + store.beginTrx(); + try { + addRecordToJobTable(jobId, store, CoordinatorJob.Status.SUCCEEDED); + addRecordToActionTable(jobId, actionNum, actionId, store, CoordinatorAction.Status.SUCCEEDED, + "coord-rerun-action1.xml", true); + store.commitTrx(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Could not update db."); + } + finally { + store.closeTrx(); + } + + String db = "mydb"; + String table = "mytable"; + String server = getHCatalogServer().getMetastoreAuthority(); + String newHCatDependency = "hcat://" + server + "/" + db + "/" + table + "/ds=2009121411;region=usa"; + + dropTable(db, table, true); + dropDatabase(db, true); + createDatabase(db); + createTable(db, table, "ds,region"); + addPartition(db, table, "ds=2009121411;region=usa"); + + // before cleanup + Configuration conf = new Configuration(); + URIHandler handler = services.get(URIHandlerService.class).getURIHandler(newHCatDependency); + assertTrue(handler.exists(new URI(newHCatDependency), conf, getTestUser())); + + final OozieClient coordClient = LocalOozie.getCoordClient(); + coordClient.reRunCoord(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, Integer.toString(actionNum), false, false); + + CoordinatorActionBean action2 = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, actionId); + assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED); + + waitFor(120 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + CoordinatorAction bean = coordClient.getCoordActionInfo(actionId); + return (bean.getStatus() == CoordinatorAction.Status.WAITING || bean.getStatus() == CoordinatorAction.Status.READY); + } + }); + + // after cleanup + assertFalse(handler.exists(new URI(newHCatDependency), conf, getTestUser())); + } + + /** * Test : rerun <jobId> -action 1 with no output-event * * @throws Exception @@ -1066,8 +1151,19 @@ public class TestCoordRerunXCommand extends XDataTestCase { private void addRecordToActionTable(String jobId, int actionNum, String actionId, CoordinatorStore store, CoordinatorAction.Status status, String resourceXmlName) throws StoreException, IOException { + addRecordToActionTable(jobId, actionNum, actionId, store, status, resourceXmlName, false); + } + + private void addRecordToActionTable(String jobId, int actionNum, String actionId, CoordinatorStore store, + CoordinatorAction.Status status, String resourceXmlName, boolean isHCatDep) throws StoreException, + IOException { Path appPath = new Path(getFsTestCaseDir(), "coord"); - String actionXml = getCoordActionXml(appPath, resourceXmlName); + String actionXml = null; + if(isHCatDep != true) { + actionXml = getCoordActionXml(appPath, resourceXmlName); + } else { + actionXml = getCoordActionXmlForHCat(appPath, resourceXmlName); + } String actionNomialTime = getActionNomialTime(actionXml); CoordinatorActionBean action = new CoordinatorActionBean(); @@ -1185,6 +1281,30 @@ public class TestCoordRerunXCommand extends XDataTestCase { } } + protected String getCoordActionXmlForHCat(Path appPath, String resourceXmlName) { + String hcatServer = getHCatalogServer().getMetastoreAuthority(); + String inputTemplate = "hcat://" + hcatServer + "/mydb/mytable/ds=${YEAR}${MONTH}${DAY}${HOUR};region=usa"; + inputTemplate = Matcher.quoteReplacement(inputTemplate); + String outputTemplate = "hcat://" + hcatServer + "/mydb/mytable/ds=${YEAR}${MONTH}${DAY}${HOUR};region=usa"; + outputTemplate = Matcher.quoteReplacement(outputTemplate); + String inputDir = "hcat://" + hcatServer + "/mydb/mytable/ds=2010070501;region=usa"; + inputDir = Matcher.quoteReplacement(inputDir); + String outputDir = "hcat://" + hcatServer + "/mydb/mytable/ds=2009121411;region=usa"; + outputDir = Matcher.quoteReplacement(outputDir); + try { + Reader reader = IOUtils.getResourceAsReader(resourceXmlName, -1); + String appXml = IOUtils.getReaderAsString(reader, -1); + appXml = appXml.replaceAll("#inputTemplate", inputTemplate); + appXml = appXml.replaceAll("#outputTemplate", outputTemplate); + appXml = appXml.replaceAll("#inputDir", inputDir); + appXml = appXml.replaceAll("#outputDir", outputDir); + return appXml; + } + catch (IOException ioe) { + throw new RuntimeException(XLog.format("Could not get " + resourceXmlName, ioe)); + } + } + private String getActionNomialTime(String actionXml) { Element eAction; try { http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java b/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java index 31a68bf..a49eba5 100644 --- a/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java +++ b/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java @@ -95,6 +95,13 @@ public class TestHCatURIHandler extends XHCatTestCase { hcatURI = getHCatURI(db, table, "month=02;dt=02"); assertFalse(handler.exists(hcatURI, conf, getTestUser())); + addPartition(db, table, "year=2012;month=12;dt=04;country=us"); + + hcatURI = getHCatURI(db, table, "country=us;year=2012;month=12;dt=04"); + assertTrue(handler.exists(hcatURI, conf, getTestUser())); + ((HCatURIHandler)handler).delete(hcatURI, conf, getTestUser()); + assertFalse(handler.exists(hcatURI, conf, getTestUser())); + dropTestTable(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/01cb4d55/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 695e1dd..86398ee 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1985 support dropping hcat dataset in coord rerun with cleanup option (ryota) OOZIE-2053 Change old HCatalog API (ryota) OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize (puru) OOZIE-2063 Cron syntax creates duplicate actions (bzhang)
