Author: ryota
Date: Thu Aug 8 07:46:14 2013
New Revision: 1511607
URL: http://svn.apache.org/r1511607
Log:
OOZIE-1471 Support glob in FS action and prepare blocks (ryota)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
oozie/trunk/release-log.txt
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
Thu Aug 8 07:46:14 2013
@@ -18,17 +18,16 @@
package org.apache.oozie.action.hadoop;
import java.io.IOException;
-import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
@@ -47,8 +46,12 @@ import org.jdom.Element;
*/
public class FsActionExecutor extends ActionExecutor {
+ private final int maxGlobCount;
+
public FsActionExecutor() {
super("fs");
+ maxGlobCount =
getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX,
+ LauncherMapper.GLOB_MAX_DEFAULT);
}
Path getPath(Element element, String attribute) {
@@ -213,7 +216,13 @@ public class FsActionExecutor extends Ac
argsMap.put("group", group);
try {
FileSystem fs = getFileSystemFor(path, context, fsConf);
- recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap,
dirFiles, recursive, true);
+ Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+ if (pathArr != null && pathArr.length > 0) {
+ checkGlobMax(pathArr);
+ for (Path p : pathArr) {
+ recursiveFsOperation("chgrp", fs, nameNodePath, p,
argsMap, dirFiles, recursive, true);
+ }
+ }
}
catch (Exception ex) {
throw convertException(ex);
@@ -348,11 +357,16 @@ public class FsActionExecutor extends Ac
try {
path = resolveToFullPath(nameNodePath, path, true);
FileSystem fs = getFileSystemFor(path, context, fsConf);
-
- if (fs.exists(path)) {
- if (!fs.delete(path, true)) {
- throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
- "delete, path [{0}]
could not delete path", path);
+ Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+ if (pathArr != null && pathArr.length > 0) {
+ checkGlobMax(pathArr);
+ for (Path p : pathArr) {
+ if (fs.exists(p)) {
+ if (!fs.delete(p, true)) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
+ "delete, path [{0}] could not delete
path", p);
+ }
+ }
}
}
}
@@ -416,15 +430,29 @@ public class FsActionExecutor extends Ac
source = resolveToFullPath(nameNodePath, source, true);
validateSameNN(source, target);
FileSystem fs = getFileSystemFor(source, context, fsConf);
-
- if (!fs.exists(source) && !recovery) {
- throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
- "move, source path [{0}]
does not exist", source);
+ Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
+ if (( pathArr == null || pathArr.length == 0 ) ){
+ if (!recovery) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
+ "move, source path [{0}] does not exist", source);
+ } else {
+ return;
+ }
}
-
- if (!fs.rename(source, target) && !recovery) {
- throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
- "move, could not move [{0}]
to [{1}]", source, target);
+ if (pathArr.length > 1 && (!fs.exists(target) ||
fs.isFile(target))) {
+ if(!recovery) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
+ "move, could not rename multiple sources to the
same target name");
+ } else {
+ return;
+ }
+ }
+ checkGlobMax(pathArr);
+ for (Path p : pathArr) {
+ if (!fs.rename(p, target) && !recovery) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
+ "move, could not move [{0}] to [{1}]", p, target);
+ }
}
}
catch (Exception ex) {
@@ -443,7 +471,13 @@ public class FsActionExecutor extends Ac
argsMap.put("permissions", permissions);
try {
FileSystem fs = getFileSystemFor(path, context, fsConf);
- recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap,
dirFiles, recursive, true);
+ Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
+ if (pathArr != null && pathArr.length > 0) {
+ checkGlobMax(pathArr);
+ for (Path p : pathArr) {
+ recursiveFsOperation("chmod", fs, nameNodePath, p,
argsMap, dirFiles, recursive, true);
+ }
+ }
}
catch (Exception ex) {
throw convertException(ex);
@@ -552,4 +586,11 @@ public class FsActionExecutor extends Ac
return new Path(context.getActionDir(), "fs-" +
context.getRecoveryId());
}
+ private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
+ if(pathArr.length > maxGlobCount) {
+ throw new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
+ "too many globbed files/dirs to do FS operation");
+ }
+ }
+
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
Thu Aug 8 07:46:14 2013
@@ -95,6 +95,7 @@ public class JavaActionExecutor extends
private boolean useLauncherJar;
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
+ private static int maxFSGlobMax;
private static final String SUCCEEDED = "SUCCEEDED";
private static final String KILLED = "KILLED";
@@ -153,6 +154,8 @@ public class JavaActionExecutor extends
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize =
getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE,
MAX_EXTERNAL_STATS_SIZE_DEFAULT);
maxExternalStatsSize = (maxExternalStatsSize == -1) ?
Integer.MAX_VALUE : maxExternalStatsSize;
+ //Get the limit for the maximum number of globbed files/dirs for FS
operation
+ maxFSGlobMax =
getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX,
LauncherMapper.GLOB_MAX_DEFAULT);
createLauncherJar();
@@ -620,6 +623,9 @@ public class JavaActionExecutor extends
LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
LauncherMapperHelper.setupMaxOutputData(launcherJobConf,
maxActionOutputLen);
LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf,
maxExternalStatsSize);
+ if
(getOozieConf().get(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX) != null) {
+ LauncherMapperHelper.setupMaxFSGlob(launcherJobConf,
maxFSGlobMax);
+ }
List<Element> list = actionXml.getChildren("arg", ns);
String[] args = new String[list.size()];
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
Thu Aug 8 07:46:14 2013
@@ -95,6 +95,16 @@ public class LauncherMapperHelper {
launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
maxStatsData);
}
+ /**
+ * Set the maximum number of globbed files/dirs
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param fsGlobMax the maximum number of files/dirs for FS operation
+ */
+ public static void setupMaxFSGlob(Configuration launcherConf, int
fsGlobMax){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX,
fsGlobMax);
+ }
+
public static void setupLauncherInfo(JobConf launcherConf, String jobId,
String actionId, Path actionDir,
String recoveryId, Configuration actionConf, String prepareXML)
throws IOException, HadoopAccessorException {
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
Thu Aug 8 07:46:14 2013
@@ -63,6 +63,35 @@ public class TestFSPrepareActions extend
assertFalse(fs.exists(newDir));
}
+ // Test for delete as prepare action with glob
+ @Test
+ public void testDeleteWithGlob() throws Exception {
+ Path actionDir = getFsTestCaseDir();
+ FileSystem fs = getFileSystem();
+ Path newDir = new Path(actionDir, "newDir");
+ // Delete the file if it is already there
+ if (fs.exists(newDir)) {
+ fs.delete(newDir, true);
+ }
+ fs.mkdirs(newDir);
+ fs.mkdirs(new Path(newDir, "2010"));
+ fs.mkdirs(new Path(newDir + "/2010/10"));
+ fs.mkdirs(new Path(newDir, "2011"));
+ fs.mkdirs(new Path(newDir + "/2011/10"));
+ fs.mkdirs(new Path(newDir, "2012"));
+ fs.mkdirs(new Path(newDir + "/2012/10"));
+ // Prepare block that contains delete action
+ String prepareXML = "<prepare>" + "<delete path='" + newDir +
"/201[0-1]/*" + "'/>" + "</prepare>";
+
+ JobConf conf = createJobConf();
+ LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+ PrepareActionsDriver.doOperations(prepareXML, conf);
+ assertFalse(fs.exists(new Path(newDir + "/2010/10")));
+ assertFalse(fs.exists(new Path(newDir + "/2011/10")));
+ assertTrue(fs.exists(new Path(newDir + "/2012/10")));
+ fs.delete(newDir, true);
+ }
+
// Test for mkdir as prepare action
@Test
public void testMkdir() throws Exception {
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsActionExecutor.java
Thu Aug 8 07:46:14 2013
@@ -238,6 +238,32 @@ public class TestFsActionExecutor extend
assertTrue(!fs.exists(path));
ae.delete(context, path);
+
+ }
+
+ public void testDeleteWithGlob() throws Exception {
+
+ FsActionExecutor ae = new FsActionExecutor();
+ FileSystem fs = getFileSystem();
+ Context context = createContext("<fs/>");
+ Path basePath = new Path(getFsTestCaseDir(), "2010");
+ fs.mkdirs(basePath);
+ fs.mkdirs(new Path(basePath, "10"));
+ fs.createNewFile(new Path(basePath + "/10/newfile1"));
+ fs.createNewFile(new Path(basePath + "/10/newfile2"));
+ fs.mkdirs(new Path(basePath, "11"));
+ fs.createNewFile(new Path(basePath + "/11/newfile3"));
+ fs.mkdirs(new Path(basePath, "12"));
+ fs.createNewFile(new Path(basePath + "/12/newfile4"));
+
+ Path globPath = new Path(basePath +"/1{0,1}/*");
+ ae.delete(context, globPath);
+ assertFalse(fs.exists(new Path(basePath + "/10/newfile1")));
+ assertFalse(fs.exists(new Path(basePath + "/10/newfile2")));
+ assertFalse(fs.exists(new Path(basePath + "/11/newfile3")));
+ assertTrue(fs.exists(new Path(basePath + "/12/newfile4")));
+
+ fs.delete(basePath, true);
}
public void testMove() throws Exception {
@@ -293,6 +319,71 @@ public class TestFsActionExecutor extend
assertTrue(fs.exists(target));
}
+ public void testMoveWithGlob() throws Exception {
+ FsActionExecutor ae = new FsActionExecutor();
+ FileSystem fs = getFileSystem();
+ Path source = new Path(getFsTestCaseDir(), "source");
+ Path target = new Path(getFsTestCaseDir(), "target");
+ Context context = createContext("<fs/>");
+
+ // Test simple example of glob
+ fs.mkdirs(source);
+ fs.mkdirs(target);
+ fs.createNewFile(new Path(source,"newfile1"));
+ fs.createNewFile(new Path(source,"newfile2"));
+
+ ae.move(context, new Path(source.toString() + "/*"), target, false);
+ assertTrue(fs.exists(new Path(target, "newfile1")));
+ assertTrue(fs.exists(new Path(target, "newfile2")));
+
+ // Test another example of glob
+ fs.delete(target, true);
+ fs.mkdirs(target);
+ fs.mkdirs(new Path(source + "/2010"));
+ fs.mkdirs(new Path(source + "/2011"));
+ fs.mkdirs(new Path(source + "/2012"));
+ fs.mkdirs(new Path(source + "/2010/10"));
+ fs.mkdirs(new Path(source + "/2010/11"));
+ fs.createNewFile(new Path(source + "/2010/10/newfile1"));
+ fs.createNewFile(new Path(source + "/2010/11/newfile2"));
+ fs.mkdirs(new Path(source + "/2011/09"));
+ fs.mkdirs(new Path(source + "/2011/10"));
+ fs.createNewFile(new Path(source + "/2011/09/newfile3"));
+ fs.createNewFile(new Path(source + "/2011/10/newfile4"));
+
+ ae.move(context, new Path(source.toString() + "/201[0-1]/1{0,1}/*"),
target, false);
+ assertTrue(fs.exists(new Path(target.toString() + "/newfile1")));
+ assertTrue(fs.exists(new Path(target.toString() + "/newfile2")));
+ assertFalse(fs.exists(new Path(target.toString() + "/newfile3")));
+ assertTrue(fs.exists(new Path(target.toString() + "/newfile4")));
+
+ fs.delete(new Path(source + "/2010"), true);
+ fs.delete(new Path(source + "/2011"), true);
+ fs.delete(new Path(source + "/2012"), true);
+
+ // Catch exception when trying to move multiple files (match glob) to
+ // the same name which doesn't exist
+ fs.delete(target, true);
+ try {
+ ae.move(context, new Path(source.toString() + "/*"), target, true);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals("FS012", ex.getErrorCode());
+ }
+
+ // Catch exception when trying to move multiple files (match glob) to
+ // the same file name which exists
+ fs.delete(target, true);
+ Path targetFile = new Path(target, "newfile1");
+ fs.createNewFile(targetFile);
+ try {
+ ae.move(context, new Path(source.toString() + "/*"), targetFile,
true);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals("FS012", ex.getErrorCode());
+ }
+ }
+
public void testChmod() throws Exception {
FsActionExecutor ae = new FsActionExecutor();
FileSystem fs = getFileSystem();
@@ -321,6 +412,59 @@ public class TestFsActionExecutor extend
assertEquals("rwx---r--",
fs.getFileStatus(grandchild).getPermission().toString());
}
+ public void testChmodWithGlob() throws Exception {
+ FsActionExecutor ae = new FsActionExecutor();
+ FileSystem fs = getFileSystem();
+ Context context = createContext("<fs/>");
+ Path basePath = new Path(getFsTestCaseDir(), "2010");
+ fs.mkdirs(basePath);
+ fs.mkdirs(new Path(basePath, "10"));
+ fs.mkdirs(new Path(basePath + "/10/dir1"));
+ fs.createNewFile(new Path(basePath + "/10/dir1/file1"));
+ fs.mkdirs(new Path(basePath + "/10/dir2"));
+ fs.mkdirs(new Path(basePath, "11"));
+ fs.mkdirs(new Path(basePath + "/11/dir3"));
+ fs.mkdirs(new Path(basePath, "12"));
+
+ fs.setPermission(new Path(basePath, "10"),
FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(new Path(basePath + "/10/dir1"),
FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(new Path(basePath + "/10/dir2"),
FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(new Path(basePath + "/10/dir1/file1"),
FsPermission.valueOf("-rw-rw-rw-"));
+ fs.setPermission(new Path(basePath, "11"),
FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(new Path(basePath + "/11/dir3"),
FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(new Path(basePath, "12"),
FsPermission.valueOf("-rwxrwxrwx"));
+
+ Path globPath = new Path(basePath +"/1[0-1]");
+ ae.chmod(context, globPath, "-rwx------", false, false);
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"10")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"11")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath,
"12")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath +
"/10/dir1")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath +
"/10/dir2")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath +
"/11/dir3")).getPermission().toString());
+ assertEquals("rw-rw-rw-", fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getPermission().toString());
+
+ ae.chmod(context, globPath, "-rwx------", true, false);
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"10")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"11")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath,
"12")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/10/dir1")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/10/dir2")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/11/dir3")).getPermission().toString());
+ assertEquals("rw-rw-rw-", fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getPermission().toString());
+
+ ae.chmod(context, globPath, "-rwx------", true, true);
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"10")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath,
"11")).getPermission().toString());
+ assertEquals("rwxrwxrwx", fs.getFileStatus(new Path(basePath,
"12")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/10/dir1")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/10/dir2")).getPermission().toString());
+ assertEquals("rwx------", fs.getFileStatus(new Path(basePath +
"/11/dir3")).getPermission().toString());
+ assertEquals("rw-------", fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getPermission().toString());
+
+ fs.delete(basePath, true);
+ }
+
public void testChmodRecursive() throws Exception {
FsActionExecutor ae = new FsActionExecutor();
FileSystem fs = getFileSystem();
@@ -815,4 +959,61 @@ public void testChmodRecursive() throws
assertEquals(testGroup2,
fs.getFileStatus(child).getGroup().toString());
assertEquals(testGroup2,
fs.getFileStatus(grandchild).getGroup().toString());
}
+
+ public void testChgrpWithGlob() throws Exception {
+
+ String testUser = getTestUser();
+ String testGroup = getTestGroup();
+ String testGroup2 = getTestGroup2();
+ FsActionExecutor ae = new FsActionExecutor();
+ FileSystem fs = getFileSystem();
+ Context context = createContext("<fs/>");
+ Path basePath = new Path(getFsTestCaseDir(), "2010");
+ fs.mkdirs(basePath);
+ fs.mkdirs(new Path(basePath, "10"));
+ fs.mkdirs(new Path(basePath + "/10/dir1"));
+ fs.createNewFile(new Path(basePath + "/10/dir1/file1"));
+ fs.mkdirs(new Path(basePath + "/10/dir2"));
+ fs.mkdirs(new Path(basePath, "11"));
+ fs.mkdirs(new Path(basePath + "/11/dir3"));
+ fs.mkdirs(new Path(basePath, "12"));
+
+ fs.setOwner(new Path(basePath, "10"), testUser, testGroup);
+ fs.setOwner(new Path(basePath + "/10/dir1"), testUser, testGroup);
+ fs.setOwner(new Path(basePath + "/10/dir1/file1"), testUser,
testGroup);
+ fs.setOwner(new Path(basePath + "/10/dir2"), testUser, testGroup);
+ fs.setOwner(new Path(basePath, "11"), testUser, testGroup);
+ fs.setOwner(new Path(basePath + "/11/dir3"), testUser, testGroup);
+ fs.setOwner(new Path(basePath, "12"), testUser, testGroup);
+
+ Path globPath = new Path(basePath +"/1[0-1]");
+ ae.chgrp(context, null, null, globPath, testUser, testGroup2, false,
false);
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"10")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"11")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath,
"12")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath +
"/10/dir1")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath +
"/10/dir2")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath +
"/11/dir3")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getGroup().toString());
+
+ ae.chgrp(context, null, null, globPath, testUser, testGroup2, true,
false);
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"10")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"11")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath,
"12")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/10/dir1")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/10/dir2")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/11/dir3")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getGroup().toString());
+
+ ae.chgrp(context, null, null, globPath, testUser, testGroup2, true,
true);
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"10")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath,
"11")).getGroup().toString());
+ assertEquals(testGroup, fs.getFileStatus(new Path(basePath,
"12")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/10/dir1")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/10/dir2")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/11/dir3")).getGroup().toString());
+ assertEquals(testGroup2, fs.getFileStatus(new Path(basePath +
"/10/dir1/file1")).getGroup().toString());
+
+ fs.delete(basePath, true);
+ }
}
Modified: oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL:
http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Thu Aug 8
07:46:14 2013
@@ -1033,6 +1033,8 @@ The FS commands are executed synchronous
file commands are completed before continuing to the next action.
Path names specified in the =fs= action can be parameterized (templatized)
using EL expressions.
+Path name should be specified as a absolute path. In case of =move=, =delete=,
=chmod= and =chgrp= commands, a glob pattern can also be specified instead of
an absolute path.
+For =move=, glob pattern can only be specified for source path and not the
target.
Each file path must specify the file system URI, for move operations, the
target must not specified the system URI.
@@ -1404,6 +1406,7 @@ be assigned to it. The queue name must b
The =prepare= element, if present, indicates a list of paths to delete before
starting the Java application. This should
be used exclusively for directory cleanup or dropping of hcatalog table
partitions for the Java application to be executed.
+In case of =delete=, a glob pattern can be used to specify path.
The format to specify a hcatalog table partition URI is
hcat://[metastore server]:[port]/[database name]/[table
name]/[partkey1]=[value];[partkey2]=[value].
In case of a hcatalog URI, the hive-site.xml needs to be shipped using =file=
tag and the hcatalog and hive jars
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Thu Aug 8 07:46:14 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1471 Support glob in FS action and prepare blocks (ryota)
OOZIE-1403 forkjoin validation blocks some valid cases involving decision
nodes (rkanter)
OOZIE-1449 Coordinator Workflow parent relationship is broken for purge
service (rkanter)
OOZIE-1458 If a Credentials type is not defined, Oozie should say something
(rkanter)
Modified:
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
(original)
+++
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
Thu Aug 8 07:46:14 2013
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class FSLauncherURIHandler implements LauncherURIHandler {
@@ -54,14 +55,23 @@ public class FSLauncherURIHandler implem
boolean status = false;
try {
FileSystem fs = FileSystem.get(uri, conf);
- Path path = getNormalizedPath(uri);
- if (fs.exists(path)) {
- status = fs.delete(path, true);
- if (status) {
- System.out.println("Deletion of path " + path + "
succeeded.");
+ Path[] pathArr =
FileUtil.stat2Paths(fs.globStatus(getNormalizedPath(uri)));
+ if (pathArr != null && pathArr.length > 0) {
+ int fsGlobMax =
conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, 1000);
+ if (pathArr.length > fsGlobMax) {
+ throw new LauncherException("exceeds max number (" +
fsGlobMax
+ + ") of files/dirs to delete in <prepare>");
}
- else {
- System.out.println("Deletion of path " + path + "
failed.");
+ for (Path path : pathArr) {
+ if (fs.exists(path)) {
+ status = fs.delete(path, true);
+ if (status) {
+ System.out.println("Deletion of path " + path + "
succeeded.");
+ }
+ else {
+ System.out.println("Deletion of path " + path + "
failed.");
+ }
+ }
}
}
}
Modified:
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
(original)
+++
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
Thu Aug 8 07:46:14 2013
@@ -55,6 +55,8 @@ public class LauncherMapper<K1, V1, K2,
static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT =
"oozie.action.main.arg.count";
static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX =
"oozie.action.main.arg.";
static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE =
"oozie.external.stats.max.size";
+ static final String CONF_OOZIE_ACTION_FS_GLOB_MAX =
"oozie.action.fs.glob.max";
+ static final int GLOB_MAX_DEFAULT = 1000;
static final String COUNTER_GROUP = "oozie.launcher";
static final String COUNTER_DO_ID_SWAP = "oozie.do.id.swap";
Modified:
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1511607&r1=1511606&r2=1511607&view=diff
==============================================================================
---
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
(original)
+++
oozie/trunk/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
Thu Aug 8 07:46:14 2013
@@ -21,9 +21,9 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.xml.sax.SAXException;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@@ -60,8 +60,9 @@ public class PrepareActionsDriver {
if (n.getAttributes() == null ||
n.getAttributes().getNamedItem("path") == null) {
continue;
}
- String path =
n.getAttributes().getNamedItem("path").getNodeValue().trim();
- URI uri = new URI(path);
+ String pathStr =
n.getAttributes().getNamedItem("path").getNodeValue().trim();
+ // use Path to avoid URIsyntax error caused by square bracket
in glob
+ URI uri = new Path(pathStr).toUri();
LauncherURIHandler handler = factory.getURIHandler(uri);
execute(operation, uri, handler, conf);
}
@@ -71,7 +72,7 @@ public class PrepareActionsDriver {
throw new LauncherException(saxe.getMessage(), saxe);
} catch (ParserConfigurationException pce) {
throw new LauncherException(pce.getMessage(), pce);
- } catch (URISyntaxException use) {
+ } catch (IllegalArgumentException use) {
throw new LauncherException(use.getMessage(), use);
}
}