Repository: falcon Updated Branches: refs/heads/master 897aa5fd1 -> 12675022a
FALCON-1897 Extension Job Management: CLI support Note: There is a bug to run Falcon CLI after the code refactoring. sowmyaramesh has the patch to fix this and will send a pull request. Tested all the CLI methods with her fix patch. Author: yzheng-hortonworks <[email protected]> Reviewers: "Balu Vellanki <[email protected]>" Closes #103 from yzheng-hortonworks/FALCON-1897 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/12675022 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/12675022 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/12675022 Branch: refs/heads/master Commit: 12675022a383ccdec250d2900776cdffc409c394 Parents: 897aa5f Author: yzheng-hortonworks <[email protected]> Authored: Fri Apr 22 09:04:48 2016 -0700 Committer: bvellanki <[email protected]> Committed: Fri Apr 22 09:04:48 2016 -0700 ---------------------------------------------------------------------- .../org/apache/falcon/cli/FalconEntityCLI.java | 32 ++-- .../apache/falcon/cli/FalconExtensionCLI.java | 138 ++++++++++++++--- .../org/apache/falcon/FalconCLIConstants.java | 6 + .../org/apache/falcon/client/FalconClient.java | 153 ++++++++++++++++++- .../apache/falcon/resource/InstancesResult.java | 2 +- 5 files changed, 287 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java index 5c3d2a6..37a6992 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java @@ -43,11 +43,6 @@ import java.util.Set; */ public class FalconEntityCLI extends FalconCLI { - private static final String SUBMIT_OPT = "submit"; - private static final String UPDATE_OPT = "update"; - private static final String DELETE_OPT = "delete"; - private static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule"; - private static final String VALIDATE_OPT = "validate"; private static final String DEFINITION_OPT = "definition"; public static final String SLA_MISS_ALERT_OPT = "slaAlert"; @@ -55,7 +50,6 @@ public class FalconEntityCLI extends FalconCLI { private static final String PATH_OPT = "path"; private static final String TOUCH_OPT = "touch"; private static final String PROPS_OPT = "properties"; - private static final String FIELDS_OPT = "fields"; private static final String TAGS_OPT = "tags"; private static final String NUM_INSTANCES_OPT = "numInstances"; private static final String SHOWSCHEDULER_OPT = "showScheduler"; @@ -68,9 +62,9 @@ public class FalconEntityCLI extends FalconCLI { Options entityOptions = new Options(); - Option submit = new Option(SUBMIT_OPT, false, + Option submit = new Option(FalconCLIConstants.SUBMIT_OPT, false, "Submits an entity xml to Falcon"); - Option update = new Option(UPDATE_OPT, false, + Option update = new Option(FalconCLIConstants.UPDATE_OPT, false, "Updates an existing entity xml"); Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false, "Schedules a submited entity in Falcon"); @@ -78,11 +72,11 @@ public class FalconEntityCLI extends FalconCLI { "Suspends a running entity in Falcon"); Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resumes a suspended entity in Falcon"); - Option delete = new Option(DELETE_OPT, false, + Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Deletes an entity in Falcon, and kills its instance from workflow engine"); - Option submitAndSchedule = new Option(SUBMIT_AND_SCHEDULE_OPT, false, + Option submitAndSchedule = new Option(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT, false, "Submits and entity to Falcon and schedules it immediately"); - Option validate = new Option(VALIDATE_OPT, false, + Option validate = new Option(FalconCLIConstants.VALIDATE_OPT, false, "Validates an entity based on the entity type"); Option status = new Option(FalconCLIConstants.STATUS_OPT, false, "Gets the status of entity"); @@ -129,7 +123,7 @@ public class FalconEntityCLI extends FalconCLI { Option colo = new Option(FalconCLIConstants.COLO_OPT, true, "Colo name"); Option cluster = new Option(FalconCLIConstants.CLUSTER_OPT, true, "Cluster name"); colo.setRequired(false); - Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request"); + Option fields = new Option(FalconCLIConstants.FIELDS_OPT, true, "Entity fields to show for a request"); Option filterBy = new Option(FalconCLIConstants.FILTER_BY_OPT, true, "Filter returned entities by the specified status"); Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags"); @@ -203,7 +197,7 @@ public class FalconEntityCLI extends FalconCLI { String filterTags = commandLine.getOptionValue(TAGS_OPT); String nameSubsequence = commandLine.getOptionValue(FalconCLIConstants.NAMESEQ_OPT); String tagKeywords = commandLine.getOptionValue(FalconCLIConstants.TAGKEYS_OPT); - String fields = commandLine.getOptionValue(FIELDS_OPT); + String fields = commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT); String feedInstancePath = commandLine.getOptionValue(PATH_OPT); Integer offset = parseIntegerInput(commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), 0, "offset"); Integer numResults = parseIntegerInput(commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT), @@ -248,7 +242,7 @@ public class FalconEntityCLI extends FalconCLI { SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType, entityName, start, end, colo); result = ResponseHelper.getString(response); - } else if (optionsList.contains(SUBMIT_OPT)) { + } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = client.submit(entityType, filePath, doAsUser).getMessage(); @@ -256,16 +250,16 @@ public class FalconEntityCLI extends FalconCLI { validateNotEmpty(feedInstancePath, PATH_OPT); FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath, doAsUser); result = ResponseHelper.getString(resp); - } else if (optionsList.contains(UPDATE_OPT)) { + } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT); result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage(); - } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) { + } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser, userProps).getMessage(); - } else if (optionsList.contains(VALIDATE_OPT)) { + } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage(); @@ -281,7 +275,7 @@ public class FalconEntityCLI extends FalconCLI { validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT); colo = getColo(colo); result = client.resume(entityTypeEnum, entityName, colo, doAsUser).getMessage(); - } else if (optionsList.contains(DELETE_OPT)) { + } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) { validateColo(optionsList); validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT); result = client.delete(entityTypeEnum, entityName, doAsUser).getMessage(); @@ -341,7 +335,7 @@ public class FalconEntityCLI extends FalconCLI { try { EntityList.EntityFieldList.valueOf(s.toUpperCase()); } catch (IllegalArgumentException ie) { - throw new FalconCLIException("Invalid fields argument : " + FIELDS_OPT); + throw new FalconCLIException("Invalid fields argument : " + FalconCLIConstants.FIELDS_OPT); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java index c85a196..6496106 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java @@ -27,8 +27,11 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconCLIConstants; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.client.FalconClient; +import org.apache.falcon.resource.ExtensionInstanceList; +import org.apache.falcon.resource.ExtensionJobList; import java.io.PrintStream; import java.util.HashSet; @@ -41,12 +44,15 @@ import java.util.concurrent.atomic.AtomicReference; public class FalconExtensionCLI { public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out); - // Extension artifact repository Commands + // Extension commands public static final String ENUMERATE_OPT = "enumerate"; public static final String DEFINITION_OPT = "definition"; public static final String DESCRIBE_OPT = "describe"; + public static final String INSTANCES_OPT = "instances"; + // Input parameters public static final String ENTENSION_NAME_OPT = "extensionName"; + public static final String JOB_NAME_OPT = "jobName"; public FalconExtensionCLI() { } @@ -59,56 +65,154 @@ public class FalconExtensionCLI { String result; String extensionName = commandLine.getOptionValue(ENTENSION_NAME_OPT); + String jobName = commandLine.getOptionValue(JOB_NAME_OPT); + String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT); + String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT); if (optionsList.contains(ENUMERATE_OPT)) { result = client.enumerateExtensions(); - prettyPrintJson(result); + result = prettyPrintJson(result); } else if (optionsList.contains(DEFINITION_OPT)) { - validateExtensionName(extensionName); + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); result = client.getExtensionDefinition(extensionName); - prettyPrintJson(result); + result = prettyPrintJson(result); } else if (optionsList.contains(DESCRIBE_OPT)) { - validateExtensionName(extensionName); + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); result = client.getExtensionDescription(extensionName); - OUT.get().println(result); + } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT); + result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.scheduleExtensionJob(jobName, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.suspendExtensionJob(jobName, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.resumeExtensionJob(jobName, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + result = client.deleteExtensionJob(jobName, doAsUser).getMessage(); + } else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) { + validateRequiredParameter(extensionName, ENTENSION_NAME_OPT); + ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser, + commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT), + commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), + commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT), + commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT)); + result = jobs != null ? jobs.toString() : "No extension job (" + extensionName + ") found."; + } else if (optionsList.contains(INSTANCES_OPT)) { + validateRequiredParameter(jobName, JOB_NAME_OPT); + ExtensionInstanceList instances = client.listExtensionInstance(jobName, doAsUser, + commandLine.getOptionValue(FalconCLIConstants.FIELDS_OPT), + commandLine.getOptionValue(FalconCLIConstants.START_OPT), + commandLine.getOptionValue(FalconCLIConstants.END_OPT), + commandLine.getOptionValue(FalconCLIConstants.INSTANCE_STATUS_OPT), + commandLine.getOptionValue(FalconCLIConstants.ORDER_BY_OPT), + commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT), + commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT), + commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT)); + result = instances != null ? instances.toString() : "No instance (" + jobName + ") found."; } else { throw new FalconCLIException("Invalid extension command"); } + OUT.get().println(result); } public Options createExtensionOptions() { Options extensionOptions = new Options(); - OptionGroup group = new OptionGroup(); Option enumerate = new Option(ENUMERATE_OPT, false, "Enumerate all extensions"); Option definition = new Option(DEFINITION_OPT, false, "Get extension definition"); Option describe = new Option(DESCRIBE_OPT, false, "Get extension description"); + Option list = new Option(FalconCLIConstants.LIST_OPT, false, "List extension jobs and associated entities"); + Option instances = new Option(INSTANCES_OPT, false, "List instances of an extension job"); + Option submit = new Option(FalconCLIConstants.SUBMIT_OPT, false, "Submit an extension job"); + Option submitAndSchedule = new Option(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT, false, + "Submit and schedule an extension job"); + Option update = new Option(FalconCLIConstants.UPDATE_OPT, false, "Update an extension job"); + Option validate = new Option(FalconCLIConstants.VALIDATE_OPT, false, "Validate an extension job"); + Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false, "Schedule an extension job"); + Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false, "Suspend an extension job"); + Option resume = new Option(FalconCLIConstants.RESUME_OPT, false, "Resume an extension job"); + Option delete = new Option(FalconCLIConstants.DELETE_OPT, false, "Delete an extension job"); + + OptionGroup group = new OptionGroup(); group.addOption(enumerate); group.addOption(definition); group.addOption(describe); - + group.addOption(list); + group.addOption(instances); + group.addOption(submit); + group.addOption(submitAndSchedule); + group.addOption(update); + group.addOption(validate); + group.addOption(schedule); + group.addOption(suspend); + group.addOption(resume); + group.addOption(delete); extensionOptions.addOptionGroup(group); - Option name = new Option(ENTENSION_NAME_OPT, true, "Extension name"); - extensionOptions.addOption(name); + Option extensionName = new Option(ENTENSION_NAME_OPT, true, "Extension name"); + Option jobName = new Option(JOB_NAME_OPT, true, "Extension job name"); + Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status"); + Option sortOrder = new Option(FalconCLIConstants.SORT_ORDER_OPT, true, "asc or desc order for results"); + Option offset = new Option(FalconCLIConstants.OFFSET_OPT, true, "Start returning instances from this offset"); + Option numResults = new Option(FalconCLIConstants.NUM_RESULTS_OPT, true, + "Number of results to return per request"); + Option fields = new Option(FalconCLIConstants.FIELDS_OPT, true, "Entity fields to show for a request"); + Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user"); + Option start = new Option(FalconCLIConstants.START_OPT, true, "Start time of instances"); + Option end = new Option(FalconCLIConstants.END_OPT, true, "End time of instances"); + Option status = new Option(FalconCLIConstants.STATUS_OPT, true, "Filter returned instances by status"); + Option orderBy = new Option(FalconCLIConstants.ORDER_BY_OPT, true, "Order returned instances by this field"); + Option filePath = new Option(FalconCLIConstants.FILE_PATH_OPT, true, "File path of extension parameters"); + + extensionOptions.addOption(extensionName); + extensionOptions.addOption(jobName); + extensionOptions.addOption(instanceStatus); + extensionOptions.addOption(sortOrder); + extensionOptions.addOption(offset); + extensionOptions.addOption(numResults); + extensionOptions.addOption(fields); + extensionOptions.addOption(doAs); + extensionOptions.addOption(start); + extensionOptions.addOption(end); + extensionOptions.addOption(status); + extensionOptions.addOption(orderBy); + extensionOptions.addOption(filePath); return extensionOptions; } - private void validateExtensionName(final String extensionName) throws FalconCLIException { - if (StringUtils.isBlank(extensionName)) { - throw new FalconCLIException("Extension name cannot be null or empty"); + private void validateRequiredParameter(final String parameter, final String parameterName) + throws FalconCLIException { + if (StringUtils.isBlank(parameter)) { + throw new FalconCLIException("The parameter " + parameterName + " cannot be null or empty"); } } - private static void prettyPrintJson(final String jsonString) { + private static String prettyPrintJson(final String jsonString) { if (StringUtils.isBlank(jsonString)) { - OUT.get().println("No result returned"); - return; + return "No result returned"; } Gson gson = new GsonBuilder().setPrettyPrinting().create(); JsonParser jp = new JsonParser(); JsonElement je = jp.parse(jsonString.trim()); - OUT.get().println(gson.toJson(je)); + return gson.toJson(je); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/FalconCLIConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java index bfa1748..436875d 100644 --- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java +++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java @@ -44,6 +44,11 @@ public final class FalconCLIConstants { public static final String ENTITY_NAME_OPT = "name"; public static final String FILE_PATH_OPT = "file"; public static final String VERSION_OPT = "version"; + public static final String SUBMIT_OPT = "submit"; + public static final String UPDATE_OPT = "update"; + public static final String DELETE_OPT = "delete"; + public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule"; + public static final String VALIDATE_OPT = "validate"; public static final String SCHEDULE_OPT = "schedule"; public static final String SUSPEND_OPT = "suspend"; public static final String RESUME_OPT = "resume"; @@ -52,6 +57,7 @@ public final class FalconCLIConstants { public static final String DEPENDENCY_OPT = "dependency"; public static final String LIST_OPT = "list"; public static final String SKIPDRYRUN_OPT = "skipDryRun"; + public static final String FIELDS_OPT = "fields"; public static final String INSTANCE_STATUS_OPT = "instanceStatus"; public static final String NAMESEQ_OPT = "nameseq"; public static final String TAGKEYS_OPT = "tagkeys"; http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index e8ff6f1..36fb873 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -54,6 +54,8 @@ import org.apache.falcon.metadata.RelationshipType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; +import org.apache.falcon.resource.ExtensionInstanceList; +import org.apache.falcon.resource.ExtensionJobList; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; @@ -334,9 +336,19 @@ public class FalconClient extends AbstractFalconClient { */ protected static enum ExtensionOperations { - ENUMERATE("api/extensions/enumerate/", HttpMethod.GET, MediaType.APPLICATION_JSON), - DESCRIBE("api/extensions/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN), - DEFINITION("api/extensions/definition", HttpMethod.GET, MediaType.APPLICATION_JSON); + ENUMERATE("api/extension/enumerate/", HttpMethod.GET, MediaType.APPLICATION_JSON), + DESCRIBE("api/extension/describe/", HttpMethod.GET, MediaType.TEXT_PLAIN), + DEFINITION("api/extension/definition", HttpMethod.GET, MediaType.APPLICATION_JSON), + LIST("api/extension/list", HttpMethod.GET, MediaType.APPLICATION_JSON), + INSTANCES("api/extension/instances", HttpMethod.GET, MediaType.APPLICATION_JSON), + SUBMIT("api/extension/submit", HttpMethod.POST, MediaType.TEXT_XML), + SUBMIT_AND_SCHEDULE("api/extension/submitAndSchedule", HttpMethod.POST, MediaType.TEXT_XML), + UPDATE("api/extension/update", HttpMethod.POST, MediaType.TEXT_XML), + VALIDATE("api/extension/validate", HttpMethod.POST, MediaType.TEXT_XML), + SCHEDULE("api/extension/schedule", HttpMethod.POST, MediaType.TEXT_XML), + SUSPEND("api/extension/suspend", HttpMethod.POST, MediaType.TEXT_XML), + RESUME("api/extension/resume", HttpMethod.POST, MediaType.TEXT_XML), + DELETE("api/extension/delete", HttpMethod.POST, MediaType.TEXT_XML); private String path; private String method; @@ -740,7 +752,7 @@ public class FalconClient extends AbstractFalconClient { return stream; } - private <T extends APIResult> T getResponse(Class<T> clazz, + private <T> T getResponse(Class<T> clazz, ClientResponse clientResponse) throws FalconCLIException { printClientResponse(clientResponse); checkIfSuccessful(clientResponse); @@ -823,6 +835,12 @@ public class FalconClient extends AbstractFalconClient { .method(operation.method, ClientResponse.class); } + public ClientResponse call(ExtensionOperations operation) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class); + } + public ClientResponse call(Entities operation, InputStream entityStream) { return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(operation.mimeType).type(MediaType.TEXT_XML) @@ -834,6 +852,12 @@ public class FalconClient extends AbstractFalconClient { .accept(operation.mimeType).type(MediaType.TEXT_XML) .method(operation.method, ClientResponse.class, entityStream); } + + public ClientResponse call(ExtensionOperations operation, InputStream entityStream) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class, entityStream); + } } public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException { @@ -988,15 +1012,130 @@ public class FalconClient extends AbstractFalconClient { } public String enumerateExtensions() throws FalconCLIException { - return sendExtensionRequest(ExtensionOperations.ENUMERATE, null); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.ENUMERATE.path) + .call(ExtensionOperations.ENUMERATE); + return getResponse(String.class, clientResponse); } public String getExtensionDefinition(final String extensionName) throws FalconCLIException { - return sendExtensionRequest(ExtensionOperations.DEFINITION, extensionName); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.DEFINITION.path, extensionName) + .call(ExtensionOperations.DEFINITION); + return getResponse(String.class, clientResponse); } public String getExtensionDescription(final String extensionName) throws FalconCLIException { - return sendExtensionRequest(ExtensionOperations.DESCRIBE, extensionName); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.DESCRIBE.path, extensionName) + .call(ExtensionOperations.DESCRIBE); + return getResponse(String.class, clientResponse); + } + + public APIResult submitExtensionJob(final String extensionName, final String filePath, final String doAsUser) + throws FalconCLIException { + InputStream entityStream = getServletInputStream(filePath); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.SUBMIT.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.SUBMIT, entityStream); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult submitAndScheduleExtensionJob(final String extensionName, final String filePath, + final String doAsUser) throws FalconCLIException { + InputStream entityStream = getServletInputStream(filePath); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.SUBMIT_AND_SCHEDULE.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.SUBMIT_AND_SCHEDULE, entityStream); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) + throws FalconCLIException { + InputStream entityStream = getServletInputStream(filePath); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.UPDATE.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.UPDATE, entityStream); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult validateExtensionJob(final String extensionName, final String filePath, final String doAsUser) + throws FalconCLIException { + InputStream entityStream = getServletInputStream(filePath); + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.VALIDATE.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.VALIDATE, entityStream); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult scheduleExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.SCHEDULE.path, jobName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.SCHEDULE); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult suspendExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.SUSPEND.path, jobName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.SUSPEND); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult resumeExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.RESUME.path, jobName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.RESUME); + return getResponse(APIResult.class, clientResponse); + } + + public APIResult deleteExtensionJob(final String jobName, final String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.DELETE.path, jobName) + .addQueryParam(DO_AS_OPT, doAsUser) + .call(ExtensionOperations.DELETE); + return getResponse(APIResult.class, clientResponse); + } + + public ExtensionJobList listExtensionJob(final String extensionName, final String doAsUser, + final String sortOrder, final String offset, + final String numResults, final String fields) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.LIST.path, extensionName) + .addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(FIELDS, fields) + .addQueryParam(SORT_ORDER, sortOrder) + .addQueryParam(OFFSET, offset) + .addQueryParam(NUM_RESULTS, numResults) + .call(ExtensionOperations.LIST); + return getResponse(ExtensionJobList.class, clientResponse); + } + + public ExtensionInstanceList listExtensionInstance(final String jobName, final String doAsUser, final String fields, + final String start, final String end, final String status, + final String orderBy, final String sortOrder, + final String offset, final String numResults) + throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder() + .path(ExtensionOperations.INSTANCES.path, jobName) + .addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(FIELDS, fields) + .addQueryParam(START, start) + .addQueryParam(END, end) + .addQueryParam(INSTANCE_STATUS, status) + .addQueryParam(ORDER_BY, orderBy) + .addQueryParam(SORT_ORDER, sortOrder) + .addQueryParam(OFFSET, offset) + .addQueryParam(NUM_RESULTS, numResults) + .call(ExtensionOperations.INSTANCES); + return getResponse(ExtensionInstanceList.class, clientResponse); } private String sendExtensionRequest(final ExtensionOperations operation, http://git-wip-us.apache.org/repos/asf/falcon/blob/12675022/client/src/main/java/org/apache/falcon/resource/InstancesResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java index e12c083..f8de645 100644 --- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java +++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java @@ -181,7 +181,7 @@ public class InstancesResult extends APIResult { + (this.sourceCluster == null ? "" : ", source-cluster:" + this.sourceCluster) + (this.cluster == null ? "" : ", cluster:" - + this.cluster) + "}"; + + this.cluster) + "}\n"; } }
