Repository: falcon
Updated Branches:
  refs/heads/master ade3079bf -> c2f7a2a5e


FALCON-1852 Make optional input to a process truly optional

The following changes have been made:
1. Creation of "empty dir" under the staging path of a cluster (during cluster 
creation).
2. Modify OozieELExtensions to look for availability flag and use only those 
instances that have the entire dataset. Use "empty dir" when no instances 
resolve.
3. Updated UT and IT for additional validation.

Tested manually with and without availabilityFlag supplied.

Author: Pallavi Rao <[email protected]>

Reviewers: @sandeepSamudrala , @sriksun, @pavankumar526, @ajayyadava

Closes #73 from pallavi-rao/1852 and squashes the following commits:

7e087ed [Pallavi Rao] FALCON-1852 Documentation update for optional feeds
fc9c490 [Pallavi Rao] FALCON-1852 Added mockito to the pom
92ad20c [Pallavi Rao] FALCON-1852 Added UTs
533e221 [Pallavi Rao] FALCON-1852 Make optional input to a process truly 
optional


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c2f7a2a5
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c2f7a2a5
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c2f7a2a5

Branch: refs/heads/master
Commit: c2f7a2a5e925ce790391a9d718decd5c942a59b3
Parents: ade3079
Author: Pallavi Rao <[email protected]>
Authored: Tue Mar 22 12:45:40 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Tue Mar 22 12:45:40 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/ClusterHelper.java |   7 +-
 .../entity/parser/ClusterEntityParser.java      |   4 +
 .../falcon/hadoop/HadoopClientFactory.java      |   2 +
 .../entity/parser/ClusterEntityParserTest.java  |   9 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   3 +-
 oozie-el-extensions/pom.xml                     |   5 +
 .../oozie/extensions/OozieELExtensions.java     |  47 +++++++--
 .../oozie/extensions/TestOozieELExtensions.java | 101 +++++++++++++++++--
 .../oozie/process/ProcessBundleBuilder.java     |   6 +-
 .../ProcessExecutionCoordinatorBuilder.java     |  20 ++--
 .../falcon/resource/EntityManagerJerseyIT.java  |   2 +
 11 files changed, 177 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java 
b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 41c9369..24ba7d7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -42,7 +42,7 @@ public final class ClusterHelper {
     public static final String DEFAULT_BROKER_IMPL_CLASS = 
"org.apache.activemq.ActiveMQConnectionFactory";
     public static final String WORKINGDIR = "working";
     public static final String NO_USER_BROKER_URL = "NA";
-
+    public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE";
 
 
     private ClusterHelper() {
@@ -192,4 +192,9 @@ public final class ClusterHelper {
         }
         return null;
     }
+
+    public static String getEmptyDir(Cluster cluster) {
+        return getStorageUrl(cluster) + getLocation(cluster, 
ClusterLocationType.STAGING).getPath()
+                + "/" + EMPTY_DIR_NAME;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index bef4b39..87db536 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -301,6 +301,10 @@ public class ClusterEntityParser extends 
EntityParser<Cluster> {
                     "falcon/workflows/feed", 
HadoopClientFactory.ALL_PERMISSION);
             createStagingSubdirs(fs, cluster, stagingLocation,
                     "falcon/workflows/process", 
HadoopClientFactory.ALL_PERMISSION);
+
+            // Create empty dirs for optional input
+            createStagingSubdirs(fs, cluster, stagingLocation,
+                    ClusterHelper.EMPTY_DIR_NAME, 
HadoopClientFactory.READ_ONLY_PERMISSION);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java 
b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index e33d353..e970439 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -57,6 +57,8 @@ public final class HadoopClientFactory {
             new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
 
     private static final HadoopClientFactory INSTANCE = new 
HadoopClientFactory();
+    public static final FsPermission READ_ONLY_PERMISSION =
+            new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ);
 
     private HadoopClientFactory() {
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
 
b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index f98b6e4..c45909f 100644
--- 
a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ 
b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -347,8 +347,8 @@ public class ClusterEntityParserTest extends 
AbstractTestBase {
         
Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
         
Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
         
Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
-        this.dfsCluster.getFileSystem().mkdirs(new 
Path(ClusterHelper.getLocation(cluster,
-                ClusterLocationType.STAGING).getPath()), 
HadoopClientFactory.ALL_PERMISSION);
+        String stagingPath = ClusterHelper.getLocation(cluster, 
ClusterLocationType.STAGING).getPath();
+        this.dfsCluster.getFileSystem().mkdirs(new Path(stagingPath), 
HadoopClientFactory.ALL_PERMISSION);
         clusterEntityParser.validate(cluster);
         String workingDirPath = 
cluster.getLocations().getLocations().get(0).getPath() + "/working";
         Assert.assertEquals(ClusterHelper.getLocation(cluster, 
ClusterLocationType.WORKING).getPath(), workingDirPath);
@@ -357,6 +357,11 @@ public class ClusterEntityParserTest extends 
AbstractTestBase {
         Assert.assertEquals(workingDirStatus.getPermission(), 
HadoopClientFactory.READ_EXECUTE_PERMISSION);
         Assert.assertEquals(workingDirStatus.getOwner(), 
UserGroupInformation.getLoginUser().getShortUserName());
 
+        FileStatus emptyDirStatus = 
this.dfsCluster.getFileSystem().getFileStatus(new Path(stagingPath
+                + "/" + ClusterHelper.EMPTY_DIR_NAME));
+        Assert.assertEquals(emptyDirStatus.getPermission(), 
HadoopClientFactory.READ_ONLY_PERMISSION);
+        Assert.assertEquals(emptyDirStatus.getOwner(), 
UserGroupInformation.getLoginUser().getShortUserName());
+
         String stagingSubdirFeed = 
cluster.getLocations().getLocations().get(0).getPath() + 
"/falcon/workflows/feed";
         String stagingSubdirProcess =
                 cluster.getLocations().getLocations().get(0).getPath() + 
"/falcon/workflows/process";

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index d08c3a3..7eedf87 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -682,8 +682,7 @@ Example workflow configuration:
 
 
 ---+++ Optional Inputs
-User can mention one or more inputs as optional inputs. In such cases the job 
does not wait on those inputs which are
-mentioned as optional. If they are present it considers them otherwise 
continue with the compulsory ones.
+User can mention one or more inputs as optional inputs. In such cases the job 
does not wait on those inputs which are mentioned as optional. If they are 
present it considers them otherwise continues with the mandatory ones. If some 
instances of the optional feed are present for the given data window, those are 
considered and passed on to the process. While checking for presence of an feed 
instance, Falcon looks for __availabilityFlag__ in the directory, if specified 
in the feed definition. If no __availabilityFlag__ is specified, presence of 
the instance directory is treated as indication of availability of data.
 Example:
 <verbatim>
 <feed name="feed1">

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml
index e8c1830..d0f753d 100644
--- a/oozie-el-extensions/pom.xml
+++ b/oozie-el-extensions/pom.xml
@@ -82,6 +82,11 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
----------------------------------------------------------------------
diff --git 
a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
 
b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
index 167afde..a6ff487 100644
--- 
a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
+++ 
b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
@@ -20,11 +20,14 @@ package org.apache.oozie.extensions;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.coord.CoordCommandUtils;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.SyncCoordDataset;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
@@ -34,8 +37,10 @@ import org.jdom.Text;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.List;
 import java.util.TimeZone;
 
 /**
@@ -72,15 +77,31 @@ public final class OozieELExtensions {
         //optional input
         if (uristr == null) {
             Element dsEle = getDSElement(eval, dataInName);
-            Configuration conf = new Configuration();
             SyncCoordAction appInst = (SyncCoordAction) 
eval.getVariable(CoordELFunctions.COORD_ACTION);
+
+            Configuration conf = new Configuration();
+            conf.set(OozieClient.USER_NAME, 
(String)eval.getVariable(OozieClient.USER_NAME));
             try {
                 ELEvaluator instEval = 
CoordELEvaluator.createInstancesELEvaluator(dsEle, appInst, conf);
                 StringBuilder instances = new StringBuilder();
-                CoordCommandUtils.resolveInstanceRange(dsEle, instances , 
appInst, conf, instEval);
-                uristr = CoordCommandUtils.createEarlyURIs(dsEle, 
instances.toString(),
-                        new StringBuilder(), new StringBuilder());
-                uristr = uristr.replace(CoordELFunctions.INSTANCE_SEPARATOR, 
",");
+                StringBuilder urisWithDoneFlag = new StringBuilder();
+                CoordCommandUtils.resolveInstanceRange(dsEle, instances, 
appInst, conf, instEval);
+                CoordCommandUtils.createEarlyURIs(dsEle, instances.toString(),
+                        new StringBuilder(), urisWithDoneFlag);
+                XLog.getLog(OozieELExtensions.class).debug("Resolved instances 
for " + dataInName + " : "
+                        + urisWithDoneFlag.toString());
+                // Check if availability flags are present for each instance.
+                ActionDependency actionDep = 
DependencyChecker.checkForAvailability(urisWithDoneFlag.toString(),
+                        conf, false);
+                String doneFlag = (String) eval.getVariable(dataInName + 
".done-flag");
+                uristr = 
StringUtils.join(stripDoneFlag(actionDep.getAvailableDependencies(), doneFlag), 
",");
+                // If no instances are present, point the optional input to 
empty dir.
+                if (StringUtils.isEmpty(uristr)) {
+                    String emptyDir = (String) eval.getVariable(dataInName + 
".empty-dir");
+                    XLog.getLog(OozieELExtensions.class).debug("No instances 
could be resolved. Passing empty dir : "
+                            + emptyDir);
+                    uristr = emptyDir;
+                }
             } catch (Exception e) {
                 throw new RuntimeException("Failed to resolve instance range 
for " + dataInName, e);
             }
@@ -108,6 +129,17 @@ public final class OozieELExtensions {
         return uristr;
     }
 
+    private static List<String> stripDoneFlag(List<String> 
availableDependencies, String doneFlag) {
+        if (StringUtils.isEmpty(doneFlag)) {
+            return availableDependencies;
+        }
+        List<String> strippedAvailableDeps = new ArrayList<>();
+        for (String availableDep : availableDependencies) {
+            strippedAvailableDeps.add(StringUtils.stripEnd(availableDep, "/" + 
doneFlag));
+        }
+        return strippedAvailableDeps;
+    }
+
     private static Element getDSElement(ELEvaluator eval, String dataInName) {
         Element ele = new Element("datain");
         Element dsEle = new Element("dataset");
@@ -121,7 +153,10 @@ public final class OozieELExtensions {
         String[] children = {"done-flag", "uri-template"};
         for (String child : children) {
             Element childEle = new Element(child);
-            childEle.setContent(new Text(((String) eval.getVariable(dataInName 
+ "." + child)).replace('%', '$')));
+            String text = (String) eval.getVariable(dataInName + "." + child);
+            if (text != null) {
+                childEle.setContent(new Text(text.replace('%', '$')));
+            }
             dsEle.getChildren().add(childEle);
         }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
----------------------------------------------------------------------
diff --git 
a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
 
b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
index c99a2b3..b9bf594 100644
--- 
a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
+++ 
b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
@@ -19,6 +19,10 @@
 package org.apache.oozie.extensions;
 
 import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
@@ -27,13 +31,23 @@ import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.SyncCoordDataset;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
+
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -122,23 +136,47 @@ public class TestOozieELExtensions {
         String expuris =
                 
"hdfs://localhost:8020/clicks/2009/09/02/10/*/US,hdfs://localhost:8020/clicks/2009/09/02/09/*/US";
         Assert.assertEquals(expuris, CoordELFunctions.evalAndWrap(eval, 
"${dataIn('clicks', '*/US')}"));
+    }
 
-        //test optional input
+    @Test(dataProvider = "optionalDatasets")
+    public void testDataInOptional(String expuris, String partition, String 
doneFlag) throws Exception {
+        ELEvaluator eval = createActionStartEvaluator();
         String inName = "clicks";
+        eval.setVariable(".datain.clicks", null);
+        Services.get().setService(DummyURIHandlerService.class);
+
         SyncCoordDataset ds = createDataSet("2007-09-30T010:00Z");
         eval.setVariable(inName + ".frequency", 
String.valueOf(ds.getFrequency()));
         eval.setVariable(inName + ".freq_timeunit", ds.getTimeUnit().name());
         eval.setVariable(inName + ".timezone", ds.getTimeZone().getID());
         eval.setVariable(inName + ".end_of_duration", Timeunit.NONE.name());
         eval.setVariable(inName + ".initial-instance", 
OozieELExtensions.formatDateUTC(ds.getInitInstance()));
-        eval.setVariable(inName + ".done-flag", "notused");
+        eval.setVariable(inName + ".done-flag", doneFlag);
         eval.setVariable(inName + ".uri-template", ds.getUriTemplate());
         eval.setVariable(inName + ".start-instance", "now(-1,0)");
         eval.setVariable(inName + ".end-instance", "now(0,0)");
-        // TODO Had to comment this out for this test to PASS else NPE in
-        // TODO 
org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:359)
-        // eval.setVariable(".datain.clicks", null);
-        Assert.assertEquals(expuris, CoordELFunctions.evalAndWrap(eval, 
"${dataIn('clicks', '*/US')}"));
+        eval.setVariable(OozieClient.USER_NAME, "test");
+        eval.setVariable(inName + ".empty-dir", 
"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE");
+        Assert.assertEquals(CoordELFunctions.evalAndWrap(eval, 
"${dataIn('clicks', '" + partition + "')}"), expuris);
+    }
+
+    @DataProvider(name = "optionalDatasets")
+    public Object[][] getOptionalDatasets() {
+        return new Object[][] {
+            // With partitions and availability flag. All instances available.
+            
{"hdfs://localhost:8020/clicks/2009/09/02/10/*/US,hdfs://localhost:8020/clicks/2009/09/02/09/*/US",
+                "*/US", "_DONE", },
+            // With availability flag. All instances missing
+            
{"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "null", 
"_FINISH"},
+            // No availability flag. One instance missing
+            {"hdfs://localhost:8020/clicks/2009/09/02/09", "null", ""},
+            // With availability flag. One instance missing.
+            {"hdfs://localhost:8020/clicks/2009/09/02/10", "null", "_SUCCESS"},
+            // No availability flag and partition. One instance missing
+            {"hdfs://localhost:8020/clicks/2009/09/02/09/US", "US", ""},
+            // With availability flag and partition. One instance missing.
+            {"hdfs://localhost:8020/clicks/2009/09/02/10/US", "US", 
"_SUCCESS"},
+        };
     }
 
     @Test
@@ -309,4 +347,55 @@ public class TestOozieELExtensions {
         CoordELFunctions.configureEvaluator(eval, null, appInst);
         return eval;
     }
+
+    // A mock URIHandlerService that simulates availability of data as per 
testcase requirement
+    private static class DummyURIHandlerService extends URIHandlerService {
+        private URIHandler mockHandler = Mockito.mock(FSURIHandler.class);
+
+        @Override
+        public void init(Services services) throws ServiceException {
+            try {
+                Mockito.when(mockHandler.exists((URI)Mockito.argThat(new 
URIMatcher()),
+                        Mockito.any(Configuration.class), 
Mockito.matches("test"))).thenReturn(true);
+                
Mockito.when(mockHandler.getURIWithDoneFlag(Mockito.anyString(),
+                        Mockito.anyString())).thenCallRealMethod();
+            } catch (URIHandlerException e) {
+                throw new ServiceException(e);
+            }
+        }
+
+        @Override
+        public void destroy() {
+
+        }
+
+        public URIHandler getURIHandler(URI uri) {
+            return mockHandler;
+        }
+
+        @Override
+        public Class<? extends Service> getInterface() {
+            return URIHandlerService.class;
+        }
+    }
+
+    private static class URIMatcher extends ArgumentMatcher {
+        private List<URI> availableURIs = new ArrayList<>();
+
+        public URIMatcher() {
+            try {
+                availableURIs.add(new 
URI("hdfs://localhost:8020/clicks/2009/09/02/10/_DONE"));
+                availableURIs.add(new 
URI("hdfs://localhost:8020/clicks/2009/09/02/09/_DONE"));
+                availableURIs.add(new 
URI("hdfs://localhost:8020/clicks/2009/09/02/10/_SUCCESS"));
+                availableURIs.add(new 
URI("hdfs://localhost:8020/clicks/2009/09/02/09"));
+            } catch (URISyntaxException e) {
+                //Shouldn't happen
+            }
+        }
+
+        @Override
+        public boolean matches(Object o) {
+            return availableURIs.contains(o);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 806810e..6661dd5 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.EntityType;
@@ -65,12 +66,13 @@ public class ProcessBundleBuilder extends 
OozieBundleBuilder<Process> {
                     properties.put(inName + ".end_of_duration", 
Timeunit.NONE.name());
                     properties.put(inName + ".initial-instance",
                         
SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-                    properties.put(inName + ".done-flag", "notused");
+                    String doneFlag = feed.getAvailabilityFlag();
+                    properties.put(inName + ".done-flag", (doneFlag == null)? 
"" : doneFlag);
 
                     String locPath = 
FeedHelper.createStorage(cluster.getName(), feed)
                         .getUriTemplate(LocationType.DATA).replace('$', '%');
                     properties.put(inName + ".uri-template", locPath);
-
+                    properties.put(inName + ".empty-dir", 
ClusterHelper.getEmptyDir(cluster));
                     properties.put(inName + ".start-instance", in.getStart());
                     properties.put(inName + ".end-instance", in.getEnd());
                 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index f5c9948..91f4757 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -158,20 +158,20 @@ public class ProcessExecutionCoordinatorBuilder extends 
OozieCoordinatorBuilder<
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
 
+            if (coord.getDatasets() == null) {
+                coord.setDatasets(new DATASETS());
+            }
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, 
input.getName(), LocationType.DATA);
+            if (syncdataset == null) {
+                return;
+            }
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
             if (!input.isOptional()) {
-                if (coord.getDatasets() == null) {
-                    coord.setDatasets(new DATASETS());
-                }
                 if (coord.getInputEvents() == null) {
                     coord.setInputEvents(new INPUTEVENTS());
                 }
-
-                SYNCDATASET syncdataset = createDataSet(feed, cluster, 
storage, input.getName(), LocationType.DATA);
-                if (syncdataset == null) {
-                    return;
-                }
-                
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
                 DATAIN datain = createDataIn(input);
                 coord.getInputEvents().getDataIn().add(datain);
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java 
b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index f336422..b6553e9 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -198,6 +198,8 @@ public class EntityManagerJerseyIT extends 
AbstractSchedulerManagerJerseyIT {
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         schedule(context);
+        waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), 
START_INSTANCE,
+                InstancesResult.WorkflowStatus.SUCCEEDED);
     }
 
     public void testDryRun() throws Exception {

Reply via email to