Repository: oozie Updated Branches: refs/heads/master 9959e2ca0 -> 4d43fe8c4
OOZIE-2287 Add support for deleting hcat partitions in fs action delete (kailongs via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4d43fe8c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4d43fe8c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4d43fe8c Branch: refs/heads/master Commit: 4d43fe8c4b4f40b97bf6b3d8d135a3606b98c790 Parents: 9959e2c Author: Rohini Palaniswamy <[email protected]> Authored: Tue Jul 7 12:50:45 2015 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Tue Jul 7 12:50:45 2015 -0700 ---------------------------------------------------------------------- .../oozie/action/hadoop/FsActionExecutor.java | 35 ++++++++----- .../action/hadoop/ActionExecutorTestCase.java | 4 +- .../action/hadoop/TestFsActionExecutor.java | 52 ++++++++++++++++++++ .../src/site/twiki/WorkflowFunctionalSpec.twiki | 2 +- release-log.txt | 1 + 5 files changed, 81 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/4d43fe8c/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java index b25ac52..8d96a47 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java @@ -19,6 +19,7 @@ package org.apache.oozie.action.hadoop; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; @@ -36,10 +37,13 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.dependency.FSURIHandler; +import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; @@ -363,20 +367,28 @@ public class FsActionExecutor extends ActionExecutor { * @throws ActionExecutorException */ public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { + URI uri = path.toUri(); + URIHandler handler; try { - path = resolveToFullPath(nameNodePath, path, true); - FileSystem fs = getFileSystemFor(path, context, fsConf); - Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); - if (pathArr != null && pathArr.length > 0) { - checkGlobMax(pathArr); - for (Path p : pathArr) { - if (fs.exists(p)) { - if (!fs.delete(p, true)) { - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", - "delete, path [{0}] could not delete path", p); + handler = Services.get().get(URIHandlerService.class).getURIHandler(uri); + if (handler instanceof FSURIHandler) { + // Use legacy code to handle hdfs partition deletion + path = resolveToFullPath(nameNodePath, path, true); + FileSystem fs = getFileSystemFor(path, context, fsConf); + Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); + if (pathArr != null && pathArr.length > 0) { + checkGlobMax(pathArr); + for (Path p : pathArr) { + if (fs.exists(p)) { + if (!fs.delete(p, true)) { + throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", + "delete, path [{0}] could not delete path", p); + } } } } + } else { + handler.delete(uri, handler.getContext(uri, fsConf, context.getWorkflow().getUser(), false)); } } catch (Exception ex) { @@ -511,8 +523,9 @@ public class FsActionExecutor extends ActionExecutor { st = fs.getFileStatus(path); if (st.isDir()) { throw new Exception(path.toString() + " is a directory"); - } else if (st.getLen() != 0) + } else if (st.getLen() != 0) { throw new Exception(path.toString() + " must be a zero-length file"); + } } FSDataOutputStream out = fs.create(path); out.close(); http://git-wip-us.apache.org/repos/asf/oozie/blob/4d43fe8c/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java index b08c236..e1c450c 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java @@ -36,6 +36,8 @@ import org.apache.oozie.service.UUIDService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; import org.apache.oozie.test.XFsTestCase; +import org.apache.oozie.test.XHCatTestCase; +import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -58,7 +60,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -public abstract class ActionExecutorTestCase extends XFsTestCase { +public abstract class ActionExecutorTestCase extends XHCatTestCase { @Override protected void setUp() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/4d43fe8c/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java index 4ac6ffe..86d5fa9 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java @@ -22,22 +22,34 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.dependency.FSURIHandler; +import org.apache.oozie.dependency.HCatURIHandler; +import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.action.ActionExecutorException; +import org.apache.oozie.service.HCatAccessorService; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import java.io.OutputStreamWriter; import java.io.Writer; +import java.net.URI; import java.text.MessageFormat; public class TestFsActionExecutor extends ActionExecutorTestCase { + final String db = "db1"; + final String table = "table1"; + private Services services; + private URIHandlerService uriService; + private JobConf conf; protected void setSystemProps() throws Exception { super.setSystemProps(); setSystemProperty("oozie.service.ActionService.executor.classes", FsActionExecutor.class.getName()); @@ -245,6 +257,46 @@ public class TestFsActionExecutor extends ActionExecutorTestCase { } + @Override + public void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.getConf().set(URIHandlerService.URI_HANDLERS, + FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName()); + services.setService(HCatAccessorService.class); + services.init(); + conf = createJobConf(); + uriService = Services.get().get(URIHandlerService.class); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + private void createTestTable() throws Exception { + dropTable(db, table, true); + dropDatabase(db, true); + createDatabase(db); + createTable(db, table, "year,month,dt,country"); + } + + public void testDeleteHcat() throws Exception { + createTestTable(); + addPartition(db, table, "year=2012;month=12;dt=02;country=us"); + URI hcatURI = getHCatURI(db, table, "country=us;year=2012;month=12;dt=02"); + URIHandler handler = uriService.getURIHandler(hcatURI); + FsActionExecutor ae = new FsActionExecutor(); + Path path = new Path(hcatURI); + Path nameNodePath = new Path(getNameNodeUri()); + Context context = createContext("<fs/>"); + XConfiguration conf = new XConfiguration(); + assertTrue(handler.exists(hcatURI, conf, getTestUser())); + ae.delete(context, conf, nameNodePath, path); + assertFalse(handler.exists(hcatURI, conf, getTestUser())); + } + public void testDeleteWithGlob() throws Exception { FsActionExecutor ae = new FsActionExecutor(); http://git-wip-us.apache.org/repos/asf/oozie/blob/4d43fe8c/docs/src/site/twiki/WorkflowFunctionalSpec.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki index 6b55117..bf4d123 100644 --- a/docs/src/site/twiki/WorkflowFunctionalSpec.twiki +++ b/docs/src/site/twiki/WorkflowFunctionalSpec.twiki @@ -1178,7 +1178,7 @@ executed. Thus there is less chance of an error occurring while the =fs= action </verbatim> The =delete= command deletes the specified path, if it is a directory it deletes recursively all its content and then -deletes the directory. +deletes the directory. It can also be used to drop hcat partitions. This is the only FS command which supports HCatalog URIs as well. For eg: <verbatim><delete path='hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];...'/></verbatim> The =mkdir= command creates the specified directory, it creates all missing directories in the path. If the directory already exist it does a no-op. http://git-wip-us.apache.org/repos/asf/oozie/blob/4d43fe8c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 73ef9c7..3a990bb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2287 Add support for deleting hcat partitions in fs action delete (kailongs via rohini) OOZIE-2285 Change in concurrency should trigger coord action ready command (kailongs via rohini) OOZIE-2284 HBaseCredentials should only add hbase-default.xml and hbase-site.xml to actionConf (rohini) OOZIE-2286 Update Log4j and Log4j-extras to latest 1.2.x release (rkanter)
