Author: ryota
Date: Thu Aug  8 07:46:14 2013
New Revision: 1511607

URL: http://svn.apache.org/r1511607
Log:
OOZIE-1471 Support glob in FS action and prepare blocks (ryota)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
    oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
    oozie/trunk/release-log.txt
    
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
    
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
    
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
 Thu Aug  8 07:46:14 2013
@@ -18,17 +18,16 @@
 package org.apache.oozie.action.hadoop;
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
@@ -47,8 +46,12 @@ import org.jdom.Element;
  */
 public class FsActionExecutor extends ActionExecutor {
 
+    private final int maxGlobCount;
+
     public FsActionExecutor() {
         super("fs");
+        maxGlobCount = 
getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX,
+                LauncherMapper.GLOB_MAX_DEFAULT);
     }
 
     Path getPath(Element element, String attribute) {
@@ -213,7 +216,13 @@ public class FsActionExecutor extends Ac
         argsMap.put("group", group);
         try {
             FileSystem fs = getFileSystemFor(path, context, fsConf);
-            recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap, 
dirFiles, recursive, true);
+            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+            if (pathArr != null && pathArr.length > 0) {
+                checkGlobMax(pathArr);
+                for (Path p : pathArr) {
+                    recursiveFsOperation("chgrp", fs, nameNodePath, p, 
argsMap, dirFiles, recursive, true);
+                }
+            }
         }
         catch (Exception ex) {
             throw convertException(ex);
@@ -348,11 +357,16 @@ public class FsActionExecutor extends Ac
         try {
             path = resolveToFullPath(nameNodePath, path, true);
             FileSystem fs = getFileSystemFor(path, context, fsConf);
-
-            if (fs.exists(path)) {
-                if (!fs.delete(path, true)) {
-                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
-                                                      "delete, path [{0}] 
could not delete path", path);
+            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+            if (pathArr != null && pathArr.length > 0) {
+                checkGlobMax(pathArr);
+                for (Path p : pathArr) {
+                    if (fs.exists(p)) {
+                        if (!fs.delete(p, true)) {
+                            throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
+                                    "delete, path [{0}] could not delete 
path", p);
+                        }
+                    }
                 }
             }
         }
@@ -416,15 +430,29 @@ public class FsActionExecutor extends Ac
             source = resolveToFullPath(nameNodePath, source, true);
             validateSameNN(source, target);
             FileSystem fs = getFileSystemFor(source, context, fsConf);
-
-            if (!fs.exists(source) && !recovery) {
-                throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
-                                                  "move, source path [{0}] 
does not exist", source);
+            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
+            if (( pathArr == null || pathArr.length == 0 ) ){
+                if (!recovery) {
+                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
+                        "move, source path [{0}] does not exist", source);
+                } else {
+                    return;
+                }
             }
-
-            if (!fs.rename(source, target) && !recovery) {
-                throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
-                                                  "move, could not move [{0}] 
to [{1}]", source, target);
+            if (pathArr.length > 1 && (!fs.exists(target) || 
fs.isFile(target))) {
+                if(!recovery) {
+                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
+                            "move, could not rename multiple sources to the 
same target name");
+                } else {
+                    return;
+                }
+            }
+            checkGlobMax(pathArr);
+            for (Path p : pathArr) {
+                if (!fs.rename(p, target) && !recovery) {
+                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
+                            "move, could not move [{0}] to [{1}]", p, target);
+                }
             }
         }
         catch (Exception ex) {
@@ -443,7 +471,13 @@ public class FsActionExecutor extends Ac
         argsMap.put("permissions", permissions);
         try {
             FileSystem fs = getFileSystemFor(path, context, fsConf);
-            recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap, 
dirFiles, recursive, true);
+            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+            if (pathArr != null && pathArr.length > 0) {
+                checkGlobMax(pathArr);
+                for (Path p : pathArr) {
+                    recursiveFsOperation("chmod", fs, nameNodePath, p, 
argsMap, dirFiles, recursive, true);
+                }
+            }
         }
         catch (Exception ex) {
             throw convertException(ex);
@@ -552,4 +586,11 @@ public class FsActionExecutor extends Ac
         return new Path(context.getActionDir(), "fs-" + 
context.getRecoveryId());
     }
 
+    private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
+        if(pathArr.length > maxGlobCount) {
+            throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
+                    "too many globbed files/dirs to do FS operation");
+        }
+    }
+
 }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
 Thu Aug  8 07:46:14 2013
@@ -95,6 +95,7 @@ public class JavaActionExecutor extends 
     private boolean useLauncherJar;
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
+    private static int maxFSGlobMax;
 
     private static final String SUCCEEDED = "SUCCEEDED";
     private static final String KILLED = "KILLED";
@@ -153,6 +154,8 @@ public class JavaActionExecutor extends 
         //Get the limit for the maximum allowed size of action stats
         maxExternalStatsSize = 
getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, 
MAX_EXTERNAL_STATS_SIZE_DEFAULT);
         maxExternalStatsSize = (maxExternalStatsSize == -1) ? 
Integer.MAX_VALUE : maxExternalStatsSize;
+        //Get the limit for the maximum number of globbed files/dirs for FS 
operation
+        maxFSGlobMax = 
getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, 
LauncherMapper.GLOB_MAX_DEFAULT);
 
         createLauncherJar();
 
@@ -620,6 +623,9 @@ public class JavaActionExecutor extends 
             LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
             LauncherMapperHelper.setupMaxOutputData(launcherJobConf, 
maxActionOutputLen);
             LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, 
maxExternalStatsSize);
+            if 
(getOozieConf().get(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX) != null) {
+                LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, 
maxFSGlobMax);
+            }
 
             List<Element> list = actionXml.getChildren("arg", ns);
             String[] args = new String[list.size()];

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
 Thu Aug  8 07:46:14 2013
@@ -95,6 +95,16 @@ public class LauncherMapperHelper {
         launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, 
maxStatsData);
     }
 
+    /**
+     * Set the maximum number of globbed files/dirs
+     *
+     * @param launcherConf the oozie launcher configuration
+     * @param fsGlobMax the maximum number of files/dirs for FS operation
+     */
+    public static void setupMaxFSGlob(Configuration launcherConf, int 
fsGlobMax){
+        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, 
fsGlobMax);
+    }
+
     public static void setupLauncherInfo(JobConf launcherConf, String jobId, 
String actionId, Path actionDir,
             String recoveryId, Configuration actionConf, String prepareXML) 
throws IOException, HadoopAccessorException {
 

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
 Thu Aug  8 07:46:14 2013
@@ -63,6 +63,35 @@ public class TestFSPrepareActions extend
         assertFalse(fs.exists(newDir));
     }
 
+    // Test for delete as prepare action with glob
+    @Test
+    public void testDeleteWithGlob() throws Exception {
+        Path actionDir = getFsTestCaseDir();
+        FileSystem fs = getFileSystem();
+        Path newDir = new Path(actionDir, "newDir");
+        // Delete the file if it is already there
+        if (fs.exists(newDir)) {
+            fs.delete(newDir, true);
+        }
+        fs.mkdirs(newDir);
+        fs.mkdirs(new Path(newDir, "2010"));
+        fs.mkdirs(new Path(newDir + "/2010/10"));
+        fs.mkdirs(new Path(newDir, "2011"));
+        fs.mkdirs(new Path(newDir + "/2011/10"));
+        fs.mkdirs(new Path(newDir, "2012"));
+        fs.mkdirs(new Path(newDir + "/2012/10"));
+        // Prepare block that contains delete action
+        String prepareXML = "<prepare>" + "<delete path='" + newDir + 
"/201[0-1]/*" + "'/>" + "</prepare>";
+
+        JobConf conf = createJobConf();
+        LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+        PrepareActionsDriver.doOperations(prepareXML, conf);
+        assertFalse(fs.exists(new Path(newDir + "/2010/10")));
+        assertFalse(fs.exists(new Path(newDir + "/2011/10")));
+        assertTrue(fs.exists(new Path(newDir + "/2012/10")));
+        fs.delete(newDir, true);
+    }
+
     // Test for mkdir as prepare action
     @Test
     public void testMkdir() throws Exception {

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
 Thu Aug  8 07:46:14 2013
@@ -238,6 +238,32 @@ public class TestFsActionExecutor extend
         assertTrue(!fs.exists(path));
 
         ae.delete(context, path);
+
+    }
+
+    public void testDeleteWithGlob() throws Exception {
+
+        FsActionExecutor ae = new FsActionExecutor();
+        FileSystem fs = getFileSystem();
+        Context context = createContext("<fs/>");
+        Path basePath = new Path(getFsTestCaseDir(), "2010");
+        fs.mkdirs(basePath);
+        fs.mkdirs(new Path(basePath, "10"));
+        fs.createNewFile(new Path(basePath + "/10/newfile1"));
+        fs.createNewFile(new Path(basePath + "/10/newfile2"));
+        fs.mkdirs(new Path(basePath, "11"));
+        fs.createNewFile(new Path(basePath + "/11/newfile3"));
+        fs.mkdirs(new Path(basePath, "12"));
+        fs.createNewFile(new Path(basePath + "/12/newfile4"));
+
+        Path globPath = new Path(basePath +"/1{0,1}/*");
+        ae.delete(context, globPath);
+        assertFalse(fs.exists(new Path(basePath + "/10/newfile1")));
+        assertFalse(fs.exists(new Path(basePath + "/10/newfile2")));
+        assertFalse(fs.exists(new Path(basePath + "/11/newfile3")));
+        assertTrue(fs.exists(new Path(basePath + "/12/newfile4")));
+
+        fs.delete(basePath, true);
     }
 
     public void testMove() throws Exception {
@@ -293,6 +319,71 @@ public class TestFsActionExecutor extend
         assertTrue(fs.exists(target));
     }
 
+    public void testMoveWithGlob() throws Exception {
+        FsActionExecutor ae = new FsActionExecutor();
+        FileSystem fs = getFileSystem();
+        Path source = new Path(getFsTestCaseDir(), "source");
+        Path target = new Path(getFsTestCaseDir(), "target");
+        Context context = createContext("<fs/>");
+
+        // Test simple example of glob
+        fs.mkdirs(source);
+        fs.mkdirs(target);
+        fs.createNewFile(new Path(source,"newfile1"));
+        fs.createNewFile(new Path(source,"newfile2"));
+
+        ae.move(context, new Path(source.toString() + "/*"), target, false);
+        assertTrue(fs.exists(new Path(target, "newfile1")));
+        assertTrue(fs.exists(new Path(target, "newfile2")));
+
+        // Test another example of glob
+        fs.delete(target, true);
+        fs.mkdirs(target);
+        fs.mkdirs(new Path(source + "/2010"));
+        fs.mkdirs(new Path(source + "/2011"));
+        fs.mkdirs(new Path(source + "/2012"));
+        fs.mkdirs(new Path(source + "/2010/10"));
+        fs.mkdirs(new Path(source + "/2010/11"));
+        fs.createNewFile(new Path(source + "/2010/10/newfile1"));
+        fs.createNewFile(new Path(source + "/2010/11/newfile2"));
+        fs.mkdirs(new Path(source + "/2011/09"));
+        fs.mkdirs(new Path(source + "/2011/10"));
+        fs.createNewFile(new Path(source + "/2011/09/newfile3"));
+        fs.createNewFile(new Path(source + "/2011/10/newfile4"));
+
+        ae.move(context, new Path(source.toString() + "/201[0-1]/1{0,1}/*"), 
target, false);
+        assertTrue(fs.exists(new Path(target.toString() + "/newfile1")));
+        assertTrue(fs.exists(new Path(target.toString() + "/newfile2")));
+        assertFalse(fs.exists(new Path(target.toString() + "/newfile3")));
+        assertTrue(fs.exists(new Path(target.toString() + "/newfile4")));
+
+        fs.delete(new Path(source + "/2010"), true);
+        fs.delete(new Path(source + "/2011"), true);
+        fs.delete(new Path(source + "/2012"), true);
+
+        // Catch exception when trying to move multiple files (match glob) to
+        // the same name which doesn't exist
+        fs.delete(target, true);
+        try {
+            ae.move(context, new Path(source.toString() + "/*"), target, true);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals("FS012", ex.getErrorCode());
+        }
+
+        // Catch exception when trying to move multiple files (match glob) to
+        // the same file name which exists
+        fs.delete(target, true);
+        Path targetFile = new Path(target, "newfile1");
+        fs.createNewFile(targetFile);
+        try {
+            ae.move(context, new Path(source.toString() + "/*"), targetFile, 
true);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals("FS012", ex.getErrorCode());
+        }
+    }
+
     public void testChmod() throws Exception {
         FsActionExecutor ae = new FsActionExecutor();
         FileSystem fs = getFileSystem();
@@ -321,6 +412,59 @@ public class TestFsActionExecutor extend
         assertEquals("rwx---r--", 
fs.getFileStatus(grandchild).getPermission().toString());
     }
 
+    public void testChmodWithGlob() throws Exception {
+        FsActionExecutor ae = new FsActionExecutor();
+        FileSystem fs = getFileSystem();
+        Context context = createContext("<fs/>");
+        Path basePath = new Path(getFsTestCaseDir(), "2010");
+        fs.mkdirs(basePath);
+        fs.mkdirs(new Path(basePath, "10"));
+        fs.mkdirs(new Path(basePath + "/10/dir1"));
+        fs.createNewFile(new Path(basePath + "/10/dir1/file1"));
+        fs.mkdirs(new Path(basePath + "/10/dir2"));
+        fs.mkdirs(new Path(basePath, "11"));
+        fs.mkdirs(new Path(basePath + "/11/dir3"));
+        fs.mkdirs(new Path(basePath, "12"));
+
+        fs.setPermission(new Path(basePath, "10"), 
FsPermission.valueOf("-rwxrwxrwx"));
+        fs.setPermission(new Path(basePath + "/10/dir1"), 
FsPermission.valueOf("-rwxrwxrwx"));
+        fs.setPermission(new Path(basePath + "/10/dir2"), 
FsPermission.valueOf("-rwxrwxrwx"));
+        fs.setPermission(new Path(basePath + "/10/dir1/file1"), 
FsPermission.valueOf("-rw-rw-rw-"));
+        fs.setPermission(new Path(basePath, "11"), 
FsPermission.valueOf("-rwxrwxrwx"));
+        fs.setPermission(new Path(basePath + "/11/dir3"), 
FsPermission.valueOf("-rwxrwxrwx"));
+        fs.setPermission(new Path(basePath, "12"), 
FsPermission.valueOf("-rwxrwxrwx"));
+
+        Path globPath = new Path(basePath +"/1[0-1]");
+        ae.chmod(context, globPath, "-rwx------", false, false);
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"10")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"11")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath, 
"12")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getPermission().toString());
+        assertEquals("rw-rw-rw-", fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getPermission().toString());
+
+        ae.chmod(context, globPath, "-rwx------", true, false);
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"10")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"11")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath, 
"12")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getPermission().toString());
+        assertEquals("rw-rw-rw-", fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getPermission().toString());
+
+        ae.chmod(context, globPath, "-rwx------", true, true);
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"10")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath, 
"11")).getPermission().toString());
+        assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath, 
"12")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getPermission().toString());
+        assertEquals("rwx------", fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getPermission().toString());
+        assertEquals("rw-------", fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getPermission().toString());
+
+        fs.delete(basePath, true);
+    }
+
 public void testChmodRecursive() throws Exception {
         FsActionExecutor ae = new FsActionExecutor();
         FileSystem fs = getFileSystem();
@@ -815,4 +959,61 @@ public void testChmodRecursive() throws 
         assertEquals(testGroup2, 
fs.getFileStatus(child).getGroup().toString());
         assertEquals(testGroup2, 
fs.getFileStatus(grandchild).getGroup().toString());
     }
+
+    public void testChgrpWithGlob() throws Exception {
+
+        String testUser = getTestUser();
+        String testGroup = getTestGroup();
+        String testGroup2 = getTestGroup2();
+        FsActionExecutor ae = new FsActionExecutor();
+        FileSystem fs = getFileSystem();
+        Context context = createContext("<fs/>");
+        Path basePath = new Path(getFsTestCaseDir(), "2010");
+        fs.mkdirs(basePath);
+        fs.mkdirs(new Path(basePath, "10"));
+        fs.mkdirs(new Path(basePath + "/10/dir1"));
+        fs.createNewFile(new Path(basePath + "/10/dir1/file1"));
+        fs.mkdirs(new Path(basePath + "/10/dir2"));
+        fs.mkdirs(new Path(basePath, "11"));
+        fs.mkdirs(new Path(basePath + "/11/dir3"));
+        fs.mkdirs(new Path(basePath, "12"));
+
+        fs.setOwner(new Path(basePath, "10"), testUser, testGroup);
+        fs.setOwner(new Path(basePath + "/10/dir1"), testUser, testGroup);
+        fs.setOwner(new Path(basePath + "/10/dir1/file1"), testUser, 
testGroup);
+        fs.setOwner(new Path(basePath + "/10/dir2"), testUser, testGroup);
+        fs.setOwner(new Path(basePath, "11"), testUser, testGroup);
+        fs.setOwner(new Path(basePath + "/11/dir3"), testUser, testGroup);
+        fs.setOwner(new Path(basePath, "12"), testUser, testGroup);
+
+        Path globPath = new Path(basePath +"/1[0-1]");
+        ae.chgrp(context, null, null, globPath, testUser, testGroup2, false, 
false);
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"10")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"11")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath, 
"12")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getGroup().toString());
+
+        ae.chgrp(context, null, null, globPath, testUser, testGroup2, true, 
false);
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"10")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"11")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath, 
"12")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getGroup().toString());
+
+        ae.chgrp(context, null, null, globPath, testUser, testGroup2, true, 
true);
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"10")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath, 
"11")).getGroup().toString());
+        assertEquals(testGroup, fs.getFileStatus(new Path(basePath, 
"12")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/10/dir1")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/10/dir2")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/11/dir3")).getGroup().toString());
+        assertEquals(testGroup2, fs.getFileStatus(new Path(basePath + 
"/10/dir1/file1")).getGroup().toString());
+
+        fs.delete(basePath, true);
+    }
 }

Modified: oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: 
http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Thu Aug  8 
07:46:14 2013
@@ -1033,6 +1033,8 @@ The FS commands are executed synchronous
 file commands are completed before continuing to the next action.
 
 Path names specified in the =fs= action can be parameterized (templatized) 
using EL expressions.
+Path name should be specified as a absolute path. In case of =move=, =delete=, 
=chmod= and =chgrp= commands, a glob pattern can also be specified instead of 
an absolute path.
+For =move=, glob pattern can only be specified for source path and not the 
target.
 
 Each file path must specify the file system URI, for move operations, the 
target must not specified the system URI.
 
@@ -1404,6 +1406,7 @@ be assigned to it. The queue name must b
 
 The =prepare= element, if present, indicates a list of paths to delete before 
starting the Java application. This should
 be used exclusively for directory cleanup or dropping of hcatalog table 
partitions for the Java application to be executed.
+In case of =delete=, a glob pattern can be used to specify path.
 The format to specify a hcatalog table partition URI is
 hcat://[metastore server]:[port]/[database name]/[table 
name]/[partkey1]=[value];[partkey2]=[value].
 In case of a hcatalog URI, the hive-site.xml needs to be shipped using =file= 
tag and the hcatalog and hive jars

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Thu Aug  8 07:46:14 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1471 Support glob in FS action and prepare blocks (ryota)
 OOZIE-1403 forkjoin validation blocks some valid cases involving decision 
nodes (rkanter)
 OOZIE-1449 Coordinator Workflow parent relationship is broken for purge 
service (rkanter)
 OOZIE-1458 If a Credentials type is not defined, Oozie should say something 
(rkanter)

Modified: 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
 (original)
+++ 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
 Thu Aug  8 07:46:14 2013
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
 public class FSLauncherURIHandler implements LauncherURIHandler {
@@ -54,14 +55,23 @@ public class FSLauncherURIHandler implem
         boolean status = false;
         try {
             FileSystem fs = FileSystem.get(uri, conf);
-            Path path = getNormalizedPath(uri);
-            if (fs.exists(path)) {
-                status = fs.delete(path, true);
-                if (status) {
-                    System.out.println("Deletion of path " + path + " 
succeeded.");
+            Path[] pathArr = 
FileUtil.stat2Paths(fs.globStatus(getNormalizedPath(uri)));
+            if (pathArr != null && pathArr.length > 0) {
+                int fsGlobMax = 
conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, 1000);
+                if (pathArr.length > fsGlobMax) {
+                    throw new LauncherException("exceeds max number (" + 
fsGlobMax
+                            + ") of files/dirs to delete in <prepare>");
                 }
-                else {
-                    System.out.println("Deletion of path " + path + " 
failed.");
+                for (Path path : pathArr) {
+                    if (fs.exists(path)) {
+                        status = fs.delete(path, true);
+                        if (status) {
+                            System.out.println("Deletion of path " + path + " 
succeeded.");
+                        }
+                        else {
+                            System.out.println("Deletion of path " + path + " 
failed.");
+                        }
+                    }
                 }
             }
         }

Modified: 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
 (original)
+++ 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
 Thu Aug  8 07:46:14 2013
@@ -55,6 +55,8 @@ public class LauncherMapper<K1, V1, K2, 
     static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = 
"oozie.action.main.arg.count";
     static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = 
"oozie.action.main.arg.";
     static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = 
"oozie.external.stats.max.size";
+    static final String CONF_OOZIE_ACTION_FS_GLOB_MAX = 
"oozie.action.fs.glob.max";
+    static final int GLOB_MAX_DEFAULT = 1000;
 
     static final String COUNTER_GROUP = "oozie.launcher";
     static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";

Modified: 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
 (original)
+++ 
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
 Thu Aug  8 07:46:14 2013
@@ -21,9 +21,9 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.xml.sax.SAXException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
@@ -60,8 +60,9 @@ public class PrepareActionsDriver {
                 if (n.getAttributes() == null || 
n.getAttributes().getNamedItem("path") == null) {
                     continue;
                 }
-                String path = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
-                URI uri = new URI(path);
+                String pathStr = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
+                // use Path to avoid URIsyntax error caused by square bracket 
in glob
+                URI uri = new Path(pathStr).toUri();
                 LauncherURIHandler handler = factory.getURIHandler(uri);
                 execute(operation, uri, handler, conf);
             }
@@ -71,7 +72,7 @@ public class PrepareActionsDriver {
             throw new LauncherException(saxe.getMessage(), saxe);
         } catch (ParserConfigurationException pce) {
             throw new LauncherException(pce.getMessage(), pce);
-        } catch (URISyntaxException use) {
+        } catch (IllegalArgumentException use) {
             throw new LauncherException(use.getMessage(), use);
         }
     }


Reply via email to