Repository: falcon
Updated Branches:
  refs/heads/master 8cc681075 -> d6965213a


http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java 
b/client/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java
new file mode 100644
index 0000000..24be7f1
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconInstanceCLI.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.ResponseHelper;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+import org.apache.falcon.resource.InstanceDependencyResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Instance extension to Falcon Command Line Interface - wraps the RESTful API 
for instances.
+ */
+public class FalconInstanceCLI extends FalconCLI {
+
+    private static final String FORCE_RERUN_FLAG = "force";
+    private static final String INSTANCE_TIME_OPT = "instanceTime";
+    private static final String RUNNING_OPT = "running";
+    private static final String KILL_OPT = "kill";
+    private static final String RERUN_OPT = "rerun";
+    private static final String LOG_OPT = "logs";
+    private static final String RUNID_OPT = "runid";
+    private static final String CLUSTERS_OPT = "clusters";
+    private static final String SOURCECLUSTER_OPT = "sourceClusters";
+    private static final String LIFECYCLE_OPT = "lifecycle";
+    private static final String PARARMS_OPT = "params";
+    private static final String LISTING_OPT = "listing";
+    private static final String TRIAGE_OPT = "triage";
+
+    public FalconInstanceCLI() throws Exception {
+        super();
+    }
+
+    public Options createInstanceOptions() {
+
+        Options instanceOptions = new Options();
+
+        Option running = new Option(RUNNING_OPT, false,
+                "Gets running process instances for a given process");
+        Option list = new Option(LIST_OPT, false,
+                "Gets all instances for a given process in the range start 
time and optional end time");
+        Option status = new Option(STATUS_OPT, false,
+                "Gets status of process instances for a given process in the 
range start time and optional end time");
+        Option summary = new Option(SUMMARY_OPT, false,
+                "Gets summary of instances for a given process in the range 
start time and optional end time");
+        Option kill = new Option(KILL_OPT, false,
+                "Kills active process instances for a given process in the 
range start time and optional end time");
+        Option suspend = new Option(SUSPEND_OPT, false,
+                "Suspends active process instances for a given process in the 
range start time and optional end time");
+        Option resume = new Option(RESUME_OPT, false,
+                "Resumes suspended process instances for a given process "
+                        + "in the range start time and optional end time");
+        Option rerun = new Option(RERUN_OPT, false,
+                "Reruns process instances for a given process in the range 
start time and "
+                        + "optional end time and overrides properties present 
in job.properties file");
+        Option logs = new Option(LOG_OPT, false,
+                "Logs print the logs for process instances for a given process 
in "
+                        + "the range start time and optional end time");
+        Option params = new Option(PARARMS_OPT, false,
+                "Displays the workflow parameters for a given instance of 
specified nominal time"
+                        + "start time represents nominal time and end time is 
not considered");
+        Option listing = new Option(LISTING_OPT, false,
+                "Displays feed listing and their status between a start and 
end time range.");
+        Option dependency = new Option(DEPENDENCY_OPT, false,
+                "Displays dependent instances for a specified instance.");
+        Option triage = new Option(TRIAGE_OPT, false,
+                "Triage a feed or process instance and find the failures in 
it's lineage.");
+
+        OptionGroup group = new OptionGroup();
+        group.addOption(running);
+        group.addOption(list);
+        group.addOption(status);
+        group.addOption(summary);
+        group.addOption(kill);
+        group.addOption(resume);
+        group.addOption(suspend);
+        group.addOption(resume);
+        group.addOption(rerun);
+        group.addOption(logs);
+        group.addOption(params);
+        group.addOption(listing);
+        group.addOption(dependency);
+        group.addOption(triage);
+
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        Option start = new Option(START_OPT, true,
+                "Start time is required for commands, status, kill, suspend, 
resume and re-run"
+                        + "and it is nominal time while displaying workflow 
params");
+        Option end = new Option(END_OPT, true,
+                "End time is optional for commands, status, kill, suspend, 
resume and re-run; "
+                        + "if not specified then current time is considered as 
end time");
+        Option runid = new Option(RUNID_OPT, true,
+                "Instance runid  is optional and user can specify the runid, 
defaults to 0");
+        Option clusters = new Option(CLUSTERS_OPT, true,
+                "clusters is optional for commands kill, suspend and resume, "
+                        + "should not be specified for other commands");
+        Option sourceClusters = new Option(SOURCECLUSTER_OPT, true,
+                " source cluster is optional for commands kill, suspend and 
resume, "
+                        + "should not be specified for other commands 
(required for only feed)");
+        Option filePath = new Option(FILE_PATH_OPT, true,
+                "Path to job.properties file is required for rerun command, "
+                        + "it should contain name=value pair for properties to 
override for rerun");
+        Option entityType = new Option(TYPE_OPT, true,
+                "Entity type, can be feed or process xml");
+        Option entityName = new Option(ENTITY_NAME_OPT, true,
+                "Entity name, can be feed or process name");
+        Option colo = new Option(COLO_OPT, true,
+                "Colo on which the cmd has to be executed");
+        Option lifecycle = new Option(LIFECYCLE_OPT, true,
+                "describes life cycle of entity , for feed it can be 
replication/retention "
+                        + "and for process it can be execution");
+        Option filterBy = new Option(FILTER_BY_OPT, true,
+                "Filter returned instances by the specified fields");
+        Option orderBy = new Option(ORDER_BY_OPT, true,
+                "Order returned instances by this field");
+        Option sortOrder = new Option(SORT_ORDER_OPT, true, "asc or desc order 
for results");
+        Option offset = new Option(OFFSET_OPT, true,
+                "Start returning instances from this offset");
+        Option numResults = new Option(NUM_RESULTS_OPT, true,
+                "Number of results to return per request");
+        Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
+                "Flag to forcefully rerun entire workflow of an instance");
+        Option doAs = new Option(DO_AS_OPT, true, "doAs user");
+
+        Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an 
instance");
+
+        instanceOptions.addOption(url);
+        instanceOptions.addOptionGroup(group);
+        instanceOptions.addOption(start);
+        instanceOptions.addOption(end);
+        instanceOptions.addOption(filePath);
+        instanceOptions.addOption(entityType);
+        instanceOptions.addOption(entityName);
+        instanceOptions.addOption(runid);
+        instanceOptions.addOption(clusters);
+        instanceOptions.addOption(sourceClusters);
+        instanceOptions.addOption(colo);
+        instanceOptions.addOption(lifecycle);
+        instanceOptions.addOption(filterBy);
+        instanceOptions.addOption(offset);
+        instanceOptions.addOption(orderBy);
+        instanceOptions.addOption(sortOrder);
+        instanceOptions.addOption(numResults);
+        instanceOptions.addOption(forceRerun);
+        instanceOptions.addOption(doAs);
+        instanceOptions.addOption(instanceTime);
+
+        return instanceOptions;
+    }
+
+    public void instanceCommand(CommandLine commandLine, FalconClient client) 
throws FalconCLIException, IOException {
+        Set<String> optionsList = new HashSet<String>();
+        for (Option option : commandLine.getOptions()) {
+            optionsList.add(option.getOpt());
+        }
+
+        String result;
+        String type = commandLine.getOptionValue(TYPE_OPT);
+        String entity = commandLine.getOptionValue(ENTITY_NAME_OPT);
+        String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT);
+        String start = commandLine.getOptionValue(START_OPT);
+        String end = commandLine.getOptionValue(END_OPT);
+        String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
+        String runId = commandLine.getOptionValue(RUNID_OPT);
+        String colo = commandLine.getOptionValue(COLO_OPT);
+        String clusters = commandLine.getOptionValue(CLUSTERS_OPT);
+        String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT);
+        List<LifeCycle> lifeCycles = 
getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT));
+        String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
+        String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
+        String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+        Integer offset = 
parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
+        Integer numResults = 
parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), null, 
"numResults");
+
+        colo = getColo(colo);
+        String instanceAction = "instance";
+        validateSortOrder(sortOrder);
+        validateInstanceCommands(optionsList, entity, type, colo);
+
+        if (optionsList.contains(TRIAGE_OPT)) {
+            validateNotEmpty(colo, COLO_OPT);
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(type, TYPE_OPT);
+            validateEntityTypeForSummary(type);
+            validateNotEmpty(entity, ENTITY_NAME_OPT);
+            result = client.triage(type, entity, start, colo).toString();
+        } else if (optionsList.contains(DEPENDENCY_OPT)) {
+            validateNotEmpty(instanceTime, INSTANCE_TIME_OPT);
+            InstanceDependencyResult response = 
client.getInstanceDependencies(type, entity, instanceTime, colo);
+            result = ResponseHelper.getString(response);
+
+        } else if (optionsList.contains(RUNNING_OPT)) {
+            validateOrderBy(orderBy, instanceAction);
+            validateFilterBy(filterBy, instanceAction);
+            result = ResponseHelper.getString(client.getRunningInstances(type,
+                    entity, colo, lifeCycles, filterBy, orderBy, sortOrder, 
offset, numResults, doAsUser));
+        } else if (optionsList.contains(STATUS_OPT) || 
optionsList.contains(LIST_OPT)) {
+            validateOrderBy(orderBy, instanceAction);
+            validateFilterBy(filterBy, instanceAction);
+            result = 
ResponseHelper.getString(client.getStatusOfInstances(type, entity, start, end, 
colo,
+                    lifeCycles, filterBy, orderBy, sortOrder, offset, 
numResults, doAsUser));
+        } else if (optionsList.contains(SUMMARY_OPT)) {
+            validateOrderBy(orderBy, "summary");
+            validateFilterBy(filterBy, "summary");
+            result = 
ResponseHelper.getString(client.getSummaryOfInstances(type, entity, start, end, 
colo,
+                    lifeCycles, filterBy, orderBy, sortOrder, doAsUser));
+        } else if (optionsList.contains(KILL_OPT)) {
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(end, END_OPT);
+            result = ResponseHelper.getString(client.killInstances(type, 
entity, start, end, colo, clusters,
+                    sourceClusters, lifeCycles, doAsUser));
+        } else if (optionsList.contains(SUSPEND_OPT)) {
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(end, END_OPT);
+            result = ResponseHelper.getString(client.suspendInstances(type, 
entity, start, end, colo, clusters,
+                    sourceClusters, lifeCycles, doAsUser));
+        } else if (optionsList.contains(RESUME_OPT)) {
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(end, END_OPT);
+            result = ResponseHelper.getString(client.resumeInstances(type, 
entity, start, end, colo, clusters,
+                    sourceClusters, lifeCycles, doAsUser));
+        } else if (optionsList.contains(RERUN_OPT)) {
+            validateNotEmpty(start, START_OPT);
+            validateNotEmpty(end, END_OPT);
+            boolean isForced = false;
+            if (optionsList.contains(FORCE_RERUN_FLAG)) {
+                isForced = true;
+            }
+            result = ResponseHelper.getString(client.rerunInstances(type, 
entity, start, end, filePath, colo,
+                    clusters, sourceClusters, lifeCycles, isForced, doAsUser));
+        } else if (optionsList.contains(LOG_OPT)) {
+            validateOrderBy(orderBy, instanceAction);
+            validateFilterBy(filterBy, instanceAction);
+            result = ResponseHelper.getString(client.getLogsOfInstances(type, 
entity, start, end, colo, runId,
+                    lifeCycles, filterBy, orderBy, sortOrder, offset, 
numResults, doAsUser), runId);
+        } else if (optionsList.contains(PARARMS_OPT)) {
+            // start time is the nominal time of instance
+            result = ResponseHelper.getString(client.getParamsOfInstance(type, 
entity,
+                    start, colo, lifeCycles, doAsUser));
+        } else if (optionsList.contains(LISTING_OPT)) {
+            result = ResponseHelper.getString(client
+                    .getFeedListing(type, entity, start, end, colo, doAsUser));
+        } else {
+            throw new FalconCLIException("Invalid command");
+        }
+
+        OUT.get().println(result);
+    }
+
+    private void validateInstanceCommands(Set<String> optionsList,
+                                          String entity, String type,
+                                          String colo) throws 
FalconCLIException {
+
+        validateNotEmpty(entity, ENTITY_NAME_OPT);
+        validateNotEmpty(type, TYPE_OPT);
+        validateNotEmpty(colo, COLO_OPT);
+
+        if (optionsList.contains(CLUSTERS_OPT)) {
+            if (optionsList.contains(RUNNING_OPT)
+                    || optionsList.contains(LOG_OPT)
+                    || optionsList.contains(STATUS_OPT)
+                    || optionsList.contains(SUMMARY_OPT)) {
+                throw new FalconCLIException("Invalid argument: clusters");
+            }
+        }
+
+        if (optionsList.contains(SOURCECLUSTER_OPT)) {
+            if (optionsList.contains(RUNNING_OPT)
+                    || optionsList.contains(LOG_OPT)
+                    || optionsList.contains(STATUS_OPT)
+                    || optionsList.contains(SUMMARY_OPT) || 
!type.equals("feed")) {
+                throw new FalconCLIException("Invalid argument: 
sourceClusters");
+            }
+        }
+
+        if (optionsList.contains(FORCE_RERUN_FLAG)) {
+            if (!optionsList.contains(RERUN_OPT)) {
+                throw new FalconCLIException("Force option can be used only 
with instance rerun");
+            }
+        }
+    }
+
+    private List<LifeCycle> getLifeCycle(String lifeCycleValue) throws 
FalconCLIException {
+
+        if (lifeCycleValue != null) {
+            String[] lifeCycleValues = lifeCycleValue.split(",");
+            List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>();
+            try {
+                for (String lifeCycle : lifeCycleValues) {
+                    
lifeCycles.add(LifeCycle.valueOf(lifeCycle.toUpperCase().trim()));
+                }
+            } catch (IllegalArgumentException e) {
+                throw new FalconCLIException("Invalid life cycle values: " + 
lifeCycles, e);
+            }
+            return lifeCycles;
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java 
b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index dbc553c..aaf59ff 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Metadata extension to Falcon Command Line Interface - wraps the RESTful API 
for Metadata.
  */
-public class FalconMetadataCLI {
+public class FalconMetadataCLI extends FalconCLI {
 
     public static final AtomicReference<PrintStream> OUT = new 
AtomicReference<PrintStream>(System.out);
 
@@ -43,10 +43,7 @@ public class FalconMetadataCLI {
     public static final String DISCOVERY_OPT = "discovery";
     public static final String LIST_OPT = "list";
     public static final String RELATIONS_OPT = "relations";
-
     public static final String URL_OPTION = "url";
-    public static final String TYPE_OPT = "type";
-    public static final String CLUSTER_OPT = "cluster";
     public static final String NAME_OPT = "name";
 
     // Lineage Commands
@@ -55,16 +52,71 @@ public class FalconMetadataCLI {
     public static final String VERTICES_CMD = "vertices";
     public static final String VERTEX_EDGES_CMD = "edges";
     public static final String PIPELINE_OPT = "pipeline";
-
-
     public static final String EDGE_CMD = "edge";
     public static final String ID_OPT = "id";
     public static final String KEY_OPT = "key";
     public static final String VALUE_OPT = "value";
     public static final String DIRECTION_OPT = "direction";
 
+    public FalconMetadataCLI() throws Exception {
+        super();
+    }
+
+    public Options createMetadataOptions() {
+        Options metadataOptions = new Options();
+
+        OptionGroup group = new OptionGroup();
+        Option discovery = new Option(DISCOVERY_OPT, false, "Discover falcon 
metadata relations");
+        Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata 
lineage information");
+        group.addOption(discovery);
+        group.addOption(lineage);
+        Option pipeline = new Option(PIPELINE_OPT, true,
+                "Get lineage graph for the entities in a pipeline");
+        metadataOptions.addOptionGroup(group);
+
+        // Add discovery options
+
+        Option list = new Option(LIST_OPT, false, "List all dimensions");
+        Option relations = new Option(RELATIONS_OPT, false, "List all 
relations for a dimension");
+        metadataOptions.addOption(list);
+        metadataOptions.addOption(relations);
+
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        Option type = new Option(TYPE_OPT, true, "Dimension type");
+        Option name = new Option(NAME_OPT, true, "Dimension name");
+        Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
+
+        // Add lineage options
+        metadataOptions.addOption(pipeline);
+
+        metadataOptions.addOption(url);
+        metadataOptions.addOption(type);
+        metadataOptions.addOption(cluster);
+        metadataOptions.addOption(name);
 
-    public FalconMetadataCLI() {}
+        Option vertex = new Option(VERTEX_CMD, false, "show the vertices");
+        Option vertices = new Option(VERTICES_CMD, false, "show the vertices");
+        Option vertexEdges = new Option(VERTEX_EDGES_CMD, false, "show the 
edges for a given vertex");
+        Option edges = new Option(EDGE_CMD, false, "show the edges");
+        Option id = new Option(ID_OPT, true, "vertex or edge id");
+        Option key = new Option(KEY_OPT, true, "key property");
+        Option value = new Option(VALUE_OPT, true, "value property");
+        Option direction = new Option(DIRECTION_OPT, true, "edge direction 
property");
+
+        metadataOptions.addOption(vertex);
+        metadataOptions.addOption(vertices);
+        metadataOptions.addOption(vertexEdges);
+        metadataOptions.addOption(edges);
+        metadataOptions.addOption(id);
+        metadataOptions.addOption(key);
+        metadataOptions.addOption(value);
+        metadataOptions.addOption(direction);
+
+        Option doAs = new Option(FalconCLI.DO_AS_OPT, true, "doAs user");
+        metadataOptions.addOption(doAs);
+
+        return metadataOptions;
+    }
 
     public void metadataCommand(CommandLine commandLine, FalconClient client) 
throws FalconCLIException {
         Set<String> optionsList = new HashSet<String>();
@@ -161,60 +213,4 @@ public class FalconMetadataCLI {
             throw new FalconCLIException("Missing argument: direction");
         }
     }
-
-    public Options createMetadataOptions() {
-        Options metadataOptions = new Options();
-
-        OptionGroup group = new OptionGroup();
-        Option discovery = new Option(DISCOVERY_OPT, false, "Discover falcon 
metadata relations");
-        Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata 
lineage information");
-        group.addOption(discovery);
-        group.addOption(lineage);
-        Option pipeline = new Option(PIPELINE_OPT, true,
-                "Get lineage graph for the entities in a pipeline");
-        metadataOptions.addOptionGroup(group);
-
-        // Add discovery options
-
-        Option list = new Option(LIST_OPT, false, "List all dimensions");
-        Option relations = new Option(RELATIONS_OPT, false, "List all 
relations for a dimension");
-        metadataOptions.addOption(list);
-        metadataOptions.addOption(relations);
-
-        Option url = new Option(URL_OPTION, true, "Falcon URL");
-        Option type = new Option(TYPE_OPT, true, "Dimension type");
-        Option name = new Option(NAME_OPT, true, "Dimension name");
-        Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
-
-        // Add lineage options
-        metadataOptions.addOption(pipeline);
-
-        metadataOptions.addOption(url);
-        metadataOptions.addOption(type);
-        metadataOptions.addOption(cluster);
-        metadataOptions.addOption(name);
-
-        Option vertex = new Option(VERTEX_CMD, false, "show the vertices");
-        Option vertices = new Option(VERTICES_CMD, false, "show the vertices");
-        Option vertexEdges = new Option(VERTEX_EDGES_CMD, false, "show the 
edges for a given vertex");
-        Option edges = new Option(EDGE_CMD, false, "show the edges");
-        Option id = new Option(ID_OPT, true, "vertex or edge id");
-        Option key = new Option(KEY_OPT, true, "key property");
-        Option value = new Option(VALUE_OPT, true, "value property");
-        Option direction = new Option(DIRECTION_OPT, true, "edge direction 
property");
-
-        metadataOptions.addOption(vertex);
-        metadataOptions.addOption(vertices);
-        metadataOptions.addOption(vertexEdges);
-        metadataOptions.addOption(edges);
-        metadataOptions.addOption(id);
-        metadataOptions.addOption(key);
-        metadataOptions.addOption(value);
-        metadataOptions.addOption(direction);
-
-        Option doAs = new Option(FalconCLI.DO_AS_OPT, true, "doAs user");
-        metadataOptions.addOption(doAs);
-
-        return metadataOptions;
-    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/client/src/main/java/org/apache/falcon/cli/FalconRecipeCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconRecipeCLI.java 
b/client/src/main/java/org/apache/falcon/cli/FalconRecipeCLI.java
new file mode 100644
index 0000000..b93fed2
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconRecipeCLI.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Recipe extension to Falcon Command Line Interface - wraps the RESTful API 
for Recipe.
+ */
+public class FalconRecipeCLI extends FalconCLI {
+
+    public FalconRecipeCLI() throws Exception {
+        super();
+    }
+
+    private static final String RECIPE_NAME = "name";
+    private static final String RECIPE_OPERATION= "operation";
+    private static final String RECIPE_TOOL_CLASS_NAME = "tool";
+
+    public Options createRecipeOptions() {
+        Options recipeOptions = new Options();
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        recipeOptions.addOption(url);
+
+        Option recipeFileOpt = new Option(RECIPE_NAME, true, "recipe name");
+        recipeOptions.addOption(recipeFileOpt);
+
+        Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, 
"recipe class");
+        recipeOptions.addOption(recipeToolClassName);
+
+        Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe 
operation");
+        recipeOptions.addOption(recipeOperation);
+
+        Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip 
dryrun operation");
+        recipeOptions.addOption(skipDryRunOperation);
+
+        Option doAs = new Option(DO_AS_OPT, true, "doAs user");
+        recipeOptions.addOption(doAs);
+
+        return recipeOptions;
+    }
+
+    public void recipeCommand(CommandLine commandLine, FalconClient client) 
throws FalconCLIException {
+        Set<String> optionsList = new HashSet<String>();
+        for (Option option : commandLine.getOptions()) {
+            optionsList.add(option.getOpt());
+        }
+
+        String recipeName = commandLine.getOptionValue(RECIPE_NAME);
+        String recipeToolClass = 
commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
+        String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+
+        validateNotEmpty(recipeName, RECIPE_NAME);
+        validateNotEmpty(recipeOperation, RECIPE_OPERATION);
+        validateRecipeOperations(recipeOperation);
+        Boolean skipDryRun = null;
+        if (optionsList.contains(SKIPDRYRUN_OPT)) {
+            skipDryRun = true;
+        }
+
+        String result = client.submitRecipe(recipeName, recipeToolClass,
+                recipeOperation, skipDryRun, doAsUser).toString();
+        OUT.get().println(result);
+    }
+
+    private static void validateRecipeOperations(String recipeOperation) 
throws FalconCLIException {
+        for(RecipeOperation operation : RecipeOperation.values()) {
+            if (operation.toString().equalsIgnoreCase(recipeOperation)) {
+                return;
+            }
+        }
+        throw new FalconCLIException("Allowed Recipe operations: "
+                + java.util.Arrays.asList((RecipeOperation.values())));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d6965213/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java 
b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 07aacdd..a1668c1 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -720,11 +720,11 @@ public class FalconCLIIT {
         String clusterName = overlay.get("cluster");
 
         Assert.assertEquals(executeWithURL(FalconCLI.ENTITY_CMD + " -" + 
FalconCLI.SCHEDULE_OPT + " -"
-                        + FalconCLI.ENTITY_TYPE_OPT + " process  -" + 
FalconCLI.ENTITY_NAME_OPT + " " + processName),
+                        + FalconCLI.TYPE_OPT + " process  -" + 
FalconCLI.ENTITY_NAME_OPT + " " + processName),
                 0);
 
         Assert.assertEquals(executeWithURL(FalconCLI.ENTITY_CMD + " -" + 
FalconCLI.SCHEDULE_OPT + " -"
-                + FalconCLI.ENTITY_TYPE_OPT + " feed -" + 
FalconCLI.ENTITY_NAME_OPT + " " + feedName), 0);
+                + FalconCLI.TYPE_OPT + " feed -" + FalconCLI.ENTITY_NAME_OPT + 
" " + feedName), 0);
 
         OozieTestUtils.waitForProcessWFtoStart(context);
 
@@ -764,11 +764,11 @@ public class FalconCLIIT {
         String clusterName = overlay.get("cluster");
 
         Assert.assertEquals(executeWithURL(FalconCLI.ENTITY_CMD + " -" + 
FalconCLI.SCHEDULE_OPT + " -"
-                        + FalconCLI.ENTITY_TYPE_OPT + " process  -" + 
FalconCLI.ENTITY_NAME_OPT + " " + processName),
+                        + FalconCLI.TYPE_OPT + " process  -" + 
FalconCLI.ENTITY_NAME_OPT + " " + processName),
                 0);
 
         Assert.assertEquals(executeWithURL(FalconCLI.ENTITY_CMD + " -" + 
FalconCLI.SCHEDULE_OPT + " -"
-                + FalconCLI.ENTITY_TYPE_OPT + " feed -" + 
FalconCLI.ENTITY_NAME_OPT + " " + feedName), 0);
+                + FalconCLI.TYPE_OPT + " feed -" + FalconCLI.ENTITY_NAME_OPT + 
" " + feedName), 0);
 
         OozieTestUtils.waitForProcessWFtoStart(context);
 

Reply via email to