Repository: incubator-falcon Updated Branches: refs/heads/master a38cebf42 -> 0d608aa73
FALCON-329 Falcon client methods should return objects. Contributed by Samar Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0d608aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0d608aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0d608aa7 Branch: refs/heads/master Commit: 0d608aa73588f694d15a825ce2432fde0de115b7 Parents: a38cebf Author: shwethags <shwetha...@inmobi.com> Authored: Fri Dec 12 12:52:41 2014 +0530 Committer: shwethags <shwetha...@inmobi.com> Committed: Fri Dec 12 12:52:41 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/ResponseHelper.java | 266 +++++++++++++++ .../java/org/apache/falcon/cli/FalconCLI.java | 92 ++++-- .../org/apache/falcon/client/FalconClient.java | 327 ++++--------------- .../src/main/java/org/apache/falcon/Debug.java | 7 +- 5 files changed, 399 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d608aa7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e9758c0..cb2edf5 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Trunk (Unreleased) NEW FEATURES IMPROVEMENTS + FALCON-329 Falcon client methods should return objects. (Samar via Shwetha GS) + FALCON-593 Preserve data type for properties in a vertex. (Ajay Yadav via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d608aa7/client/src/main/java/org/apache/falcon/ResponseHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java new file mode 100644 index 0000000..5f36e3a --- /dev/null +++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon; + +import java.util.Date; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.resource.FeedInstanceResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.EntitySummaryResult; + +/** + * Helpers for response object to string conversion. + */ + +public final class ResponseHelper { + + private ResponseHelper() { } + + public static String getString(EntitySummaryResult result) { + StringBuilder sb = new StringBuilder(); + String toAppend; + sb.append("Consolidated Status: ").append(result.getStatus()) + .append("\n"); + sb.append("\nEntity Summary Result :\n"); + if (result.getEntitySummaries() != null) { + for (EntitySummaryResult.EntitySummary entitySummary : result.getEntitySummaries()) { + toAppend = entitySummary.toString(); + sb.append(toAppend).append("\n"); + } + } + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } + + public static String getString(InstancesResult result, String runid) { + StringBuilder sb = new StringBuilder(); + String toAppend; + + sb.append("Consolidated Status: ").append(result.getStatus()) + .append("\n"); + + sb.append("\nInstances:\n"); + sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tRunID\t\t\tLog\n"); + sb.append("-----------------------------------------------------------------------------------------------\n"); + if (result.getInstances() != null) { + for (InstancesResult.Instance instance : result.getInstances()) { + + toAppend = + (instance.getInstance() != null) ? instance.getInstance() + : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getCluster() != null ? instance.getCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getSourceCluster() != null ? instance + .getSourceCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + (instance.getStatus() != null ? instance.getStatus() + .toString() : "-"); + sb.append(toAppend).append("\t"); + + toAppend = (runid != null ? runid : "latest"); + sb.append(toAppend).append("\t"); + + toAppend = + instance.getLogFile() != null ? instance.getLogFile() : "-"; + sb.append(toAppend).append("\n"); + + if (instance.actions != null) { + sb.append("actions:\n"); + for (InstancesResult.InstanceAction action : instance.actions) { + sb.append(" ").append(action.getAction()) + .append("\t"); + sb.append(action.getStatus()).append("\t") + .append(action.getLogFile()).append("\n"); + } + } + } + } + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } + + public static String getString(FeedInstanceResult result) { + StringBuilder sb = new StringBuilder(); + String toAppend; + + sb.append("Consolidated Status: ").append(result.getStatus()) + .append("\n"); + + sb.append("\nInstances:\n"); + sb.append("Cluster\t\tInstance\t\tStatus\t\tSize\t\tCreationTime\t\tDetails\n"); + sb.append("-----------------------------------------------------------------------------------------------\n"); + if (result.getInstances() != null) { + for (FeedInstanceResult.Instance instance : result.getInstances()) { + + toAppend = + instance.getCluster() != null ? instance.getCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getInstance() != null ? instance.getInstance() + : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getStatus() != null ? instance.getStatus() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getSize() != -1 ? String.valueOf(instance + .getSize()) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getCreationTime() != 0 + ? SchemaHelper.formatDateUTC(new Date(instance + .getCreationTime())) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + StringUtils.isEmpty(instance.getUri()) ? "-" : instance + .getUri(); + sb.append(toAppend).append("\n"); + } + } + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } + + public static String getString(InstancesResult result) { + StringBuilder sb = new StringBuilder(); + String toAppend; + + sb.append("Consolidated Status: ").append(result.getStatus()) + .append("\n"); + + sb.append("\nInstances:\n"); + sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tStart\t\tEnd\t\tDetails\t\t\t\t\tLog\n"); + sb.append("-----------------------------------------------------------------------------------------------\n"); + if (result.getInstances() != null) { + for (InstancesResult.Instance instance : result.getInstances()) { + + toAppend = + instance.getInstance() != null ? instance.getInstance() + : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getCluster() != null ? instance.getCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getSourceCluster() != null ? instance + .getSourceCluster() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + (instance.getStatus() != null ? instance.getStatus() + .toString() : "-"); + sb.append(toAppend).append("\t"); + + toAppend = instance.getStartTime() != null + ? SchemaHelper.formatDateUTC(instance.getStartTime()) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = instance.getEndTime() != null + ? SchemaHelper.formatDateUTC(instance.getEndTime()) : "-"; + sb.append(toAppend).append("\t"); + + toAppend = (!StringUtils.isEmpty(instance.getDetails())) + ? instance.getDetails() : "-"; + sb.append(toAppend).append("\t"); + + toAppend = + instance.getLogFile() != null ? instance.getLogFile() : "-"; + sb.append(toAppend).append("\n"); + + if (instance.getWfParams() != null) { + Map<String, String> props = instance.getWfParams(); + sb.append("Workflow params").append("\n"); + for (Map.Entry<String, String> entry : props.entrySet()) { + sb.append(entry.getKey()).append("=") + .append(entry.getValue()).append("\n"); + } + sb.append("\n"); + } + + if (instance.actions != null) { + sb.append("actions:\n"); + for (InstancesResult.InstanceAction action : instance.actions) { + sb.append(" ").append(action.getAction()).append("\t"); + sb.append(action.getStatus()).append("\t") + .append(action.getLogFile()).append("\n"); + } + } + } + } + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } + + public static String getString(InstancesSummaryResult result) { + StringBuilder sb = new StringBuilder(); + String toAppend; + + sb.append("Consolidated Status: ").append(result.getStatus()) + .append("\n"); + sb.append("\nInstances Summary:\n"); + + if (result.getInstancesSummary() != null) { + for (InstancesSummaryResult.InstanceSummary summary : result + .getInstancesSummary()) { + toAppend = + summary.getCluster() != null ? summary.getCluster() : "-"; + sb.append("Cluster: ").append(toAppend).append("\n"); + + sb.append("Status\t\tCount\n"); + sb.append("-------------------------\n"); + + for (Map.Entry<String, Long> entry : summary.getSummaryMap() + .entrySet()) { + sb.append(entry.getKey()).append("\t\t") + .append(entry.getValue()).append("\n"); + } + } + } + + sb.append("\nAdditional Information:\n"); + sb.append("Response: ").append(result.getMessage()); + sb.append("Request Id: ").append(result.getRequestId()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d608aa7/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index c4fc20f..93776d3 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -18,6 +18,7 @@ package org.apache.falcon.cli; +import org.apache.falcon.ResponseHelper; import com.sun.jersey.api.client.ClientHandlerException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -243,42 +244,71 @@ public class FalconCLI { if (optionsList.contains(RUNNING_OPT)) { validateOrderBy(orderBy, instanceAction); validateFilterBy(filterBy, instanceAction); - result = client.getRunningInstances(type, entity, colo, lifeCycles, filterBy, orderBy, sortOrder, - offset, numResults); + result = + ResponseHelper.getString(client.getRunningInstances(type, + entity, colo, lifeCycles, filterBy, orderBy, sortOrder, + offset, numResults)); } else if (optionsList.contains(STATUS_OPT) || optionsList.contains(LIST_OPT)) { validateOrderBy(orderBy, instanceAction); validateFilterBy(filterBy, instanceAction); - result = client.getStatusOfInstances(type, entity, start, end, colo, lifeCycles, - filterBy, orderBy, sortOrder, offset, numResults); + result = + ResponseHelper.getString(client + .getStatusOfInstances(type, entity, start, end, colo, + lifeCycles, + filterBy, orderBy, sortOrder, offset, numResults)); } else if (optionsList.contains(SUMMARY_OPT)) { - result = client.getSummaryOfInstances(type, entity, start, end, colo, lifeCycles); + result = + ResponseHelper.getString(client + .getSummaryOfInstances(type, entity, start, end, colo, + lifeCycles)); } else if (optionsList.contains(KILL_OPT)) { validateNotEmpty(start, START_OPT); validateNotEmpty(end, END_OPT); - result = client.killInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles); + result = + ResponseHelper.getString(client + .killInstances(type, entity, start, end, colo, clusters, + sourceClusters, lifeCycles)); } else if (optionsList.contains(SUSPEND_OPT)) { validateNotEmpty(start, START_OPT); validateNotEmpty(end, END_OPT); - result = client.suspendInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles); + result = + ResponseHelper.getString(client + .suspendInstances(type, entity, start, end, colo, clusters, + sourceClusters, lifeCycles)); } else if (optionsList.contains(RESUME_OPT)) { validateNotEmpty(start, START_OPT); validateNotEmpty(end, END_OPT); - result = client.resumeInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles); + result = + ResponseHelper.getString(client + .resumeInstances(type, entity, start, end, colo, clusters, + sourceClusters, lifeCycles)); } else if (optionsList.contains(RERUN_OPT)) { validateNotEmpty(start, START_OPT); validateNotEmpty(end, END_OPT); - result = client.rerunInstances(type, entity, start, end, filePath, colo, clusters, sourceClusters, - lifeCycles); + result = + ResponseHelper.getString(client + .rerunInstances(type, entity, start, end, filePath, colo, + clusters, sourceClusters, + lifeCycles)); } else if (optionsList.contains(LOG_OPT)) { validateOrderBy(orderBy, instanceAction); validateFilterBy(filterBy, instanceAction); - result = client.getLogsOfInstances(type, entity, start, end, colo, runId, lifeCycles, - filterBy, orderBy, sortOrder, offset, numResults); + result = + ResponseHelper.getString(client + .getLogsOfInstances(type, entity, start, end, colo, runId, + lifeCycles, + filterBy, orderBy, sortOrder, offset, numResults), + runId); } else if (optionsList.contains(PARARMS_OPT)) { // start time is the nominal time of instance - result = client.getParamsOfInstance(type, entity, start, colo, lifeCycles); + result = + ResponseHelper + .getString(client.getParamsOfInstance( + type, entity, start, colo, lifeCycles)); } else if (optionsList.contains(LISTING_OPT)) { - result = client.getFeedListing(type, entity, start, end, colo); + result = + ResponseHelper.getString(client + .getFeedListing(type, entity, start, end, colo)); } else { throw new FalconCLIException("Invalid command"); } @@ -352,51 +382,54 @@ public class FalconCLI { FalconClient.DEFAULT_NUM_RESULTS, "numResults"); Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances"); validateNotEmpty(entityType, ENTITY_TYPE_OPT); + EntityType entityTypeEnum = EntityType.valueOf(entityType.toUpperCase()); validateSortOrder(sortOrder); String entityAction = "entity"; if (optionsList.contains(SUBMIT_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = client.submit(entityType, filePath); + result = client.submit(entityType, filePath).getMessage(); } else if (optionsList.contains(UPDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); Date effectiveTime = parseDateString(time); - result = client.update(entityType, entityName, filePath, effectiveTime); + result = client.update(entityType, entityName, filePath, effectiveTime).getMessage(); } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = client.submitAndSchedule(entityType, filePath); + result = + client.submitAndSchedule(entityType, filePath).getMessage(); } else if (optionsList.contains(VALIDATE_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); - result = client.validate(entityType, filePath); + result = client.validate(entityType, filePath).getMessage(); } else if (optionsList.contains(SCHEDULE_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.schedule(entityType, entityName, colo); + result = client.schedule(entityTypeEnum, entityName, colo).getMessage(); } else if (optionsList.contains(SUSPEND_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.suspend(entityType, entityName, colo); + result = client.suspend(entityTypeEnum, entityName, colo).getMessage(); } else if (optionsList.contains(RESUME_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.resume(entityType, entityName, colo); + result = client.resume(entityTypeEnum, entityName, colo).getMessage(); } else if (optionsList.contains(DELETE_OPT)) { validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); - result = client.delete(entityType, entityName); + result = client.delete(entityTypeEnum, entityName).getMessage(); } else if (optionsList.contains(STATUS_OPT)) { validateNotEmpty(entityName, ENTITY_NAME_OPT); colo = getColo(colo); - result = client.getStatus(entityType, entityName, colo); + result = + client.getStatus(entityTypeEnum, entityName, colo).getMessage(); } else if (optionsList.contains(DEFINITION_OPT)) { validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); - result = client.getDefinition(entityType, entityName); + result = client.getDefinition(entityType, entityName).toString(); } else if (optionsList.contains(DEPENDENCY_OPT)) { validateColo(optionsList); validateNotEmpty(entityName, ENTITY_NAME_OPT); @@ -415,8 +448,12 @@ public class FalconCLI { validateEntityFields(fields); validateFilterBy(filterBy, entityAction); validateOrderBy(orderBy, entityAction); - result = client.getEntitySummary(entityType, cluster, start, end, fields, filterBy, filterTags, - orderBy, sortOrder, offset, numResults, numInstances); + result = + ResponseHelper.getString(client + .getEntitySummary( + entityType, cluster, start, end, fields, filterBy, + filterTags, + orderBy, sortOrder, offset, numResults, numInstances)); } else if (optionsList.contains(HELP_CMD)) { OUT.get().println("Falcon Help"); } else { @@ -879,7 +916,8 @@ public class FalconCLI { validateNotEmpty(recipeName, RECIPE_NAME); - String result = client.submitRecipe(recipeName, recipeToolClass); + String result = + client.submitRecipe(recipeName, recipeToolClass).getMessage(); OUT.get().println(result); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d608aa7/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 9a286ed..23c8943 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -18,6 +18,8 @@ package org.apache.falcon.client; +import org.apache.falcon.entity.v0.Entity; + import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; @@ -41,7 +43,6 @@ import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; - import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -65,7 +66,6 @@ import java.net.URL; import java.security.SecureRandom; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Properties; /** @@ -279,7 +279,7 @@ public class FalconClient { return str; } - public String schedule(String entityType, String entityName, String colo) + public APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException { return sendEntityRequest(Entities.SCHEDULE, entityType, entityName, @@ -287,28 +287,28 @@ public class FalconClient { } - public String suspend(String entityType, String entityName, String colo) + public APIResult suspend(EntityType entityType, String entityName, String colo) throws FalconCLIException { return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo); } - public String resume(String entityType, String entityName, String colo) + public APIResult resume(EntityType entityType, String entityName, String colo) throws FalconCLIException { return sendEntityRequest(Entities.RESUME, entityType, entityName, colo); } - public String delete(String entityType, String entityName) + public APIResult delete(EntityType entityType, String entityName) throws FalconCLIException { return sendEntityRequest(Entities.DELETE, entityType, entityName, null); } - public String validate(String entityType, String filePath) + public APIResult validate(String entityType, String filePath) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); @@ -316,7 +316,7 @@ public class FalconClient { entityStream, null); } - public String submit(String entityType, String filePath) + public APIResult submit(String entityType, String filePath) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); @@ -324,7 +324,7 @@ public class FalconClient { entityStream, null); } - public String update(String entityType, String entityName, String filePath, Date effectiveTime) + public APIResult update(String entityType, String entityName, String filePath, Date effectiveTime) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); Entities operation = Entities.UPDATE; @@ -340,7 +340,7 @@ public class FalconClient { return parseAPIResult(clientResponse); } - public String submitAndSchedule(String entityType, String filePath) + public APIResult submitAndSchedule(String entityType, String filePath) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); @@ -348,13 +348,13 @@ public class FalconClient { entityType, entityStream, null); } - public String getStatus(String entityType, String entityName, String colo) + public APIResult getStatus(EntityType entityType, String entityName, String colo) throws FalconCLIException { return sendEntityRequest(Entities.STATUS, entityType, entityName, colo); } - public String getDefinition(String entityType, String entityName) + public Entity getDefinition(String entityType, String entityName) throws FalconCLIException { return sendDefinitionRequest(Entities.DEFINITION, entityType, @@ -375,7 +375,7 @@ public class FalconClient { filterTags, orderBy, sortOrder, offset, numResults); } - public String getEntitySummary(String entityType, String cluster, String start, String end, + public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, String fields, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, Integer numInstances) @@ -384,40 +384,43 @@ public class FalconClient { orderBy, sortOrder, offset, numResults, numInstances); } - public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, + public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { return sendInstanceRequest(Instances.RUNNING, type, entity, null, null, - null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); + null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults) + .getEntity(InstancesResult.class); } - public String getStatusOfInstances(String type, String entity, + public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { return sendInstanceRequest(Instances.STATUS, type, entity, start, end, - null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); + null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults) + .getEntity(InstancesResult.class); } - public String getSummaryOfInstances(String type, String entity, + public InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end, String colo, List<LifeCycle> lifeCycles) throws FalconCLIException { - return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, - null, null, colo, lifeCycles); + return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, null, + null, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS).getEntity(InstancesSummaryResult.class); } - public String getFeedListing(String type, String entity, String start, + public FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo) throws FalconCLIException { - return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null, null, colo, null); + return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null, + null, colo, null, "", "", "", 0, DEFAULT_NUM_RESULTS).getEntity(FeedInstanceResult.class); } - public String killInstances(String type, String entity, String start, + public InstancesResult killInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles) throws FalconCLIException, UnsupportedEncodingException { @@ -426,7 +429,7 @@ public class FalconClient { getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); } - public String suspendInstances(String type, String entity, String start, + public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles) throws FalconCLIException, UnsupportedEncodingException { @@ -435,7 +438,7 @@ public class FalconClient { getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); } - public String resumeInstances(String type, String entity, String start, + public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles) throws FalconCLIException, UnsupportedEncodingException { @@ -444,7 +447,7 @@ public class FalconClient { getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles); } - public String rerunInstances(String type, String entity, String start, + public InstancesResult rerunInstances(String type, String entity, String start, String end, String filePath, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles) throws FalconCLIException, IOException { @@ -468,17 +471,18 @@ public class FalconClient { getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles); } - public String getLogsOfInstances(String type, String entity, String start, + public InstancesResult getLogsOfInstances(String type, String entity, String start, String end, String colo, String runId, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { return sendInstanceRequest(Instances.LOG, type, entity, start, end, - null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults); + null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults) + .getEntity(InstancesResult.class); } - public String getParamsOfInstance(String type, String entity, + public InstancesResult getParamsOfInstance(String type, String entity, String start, String colo, List<LifeCycle> lifeCycles) throws FalconCLIException, UnsupportedEncodingException { @@ -553,11 +557,11 @@ public class FalconClient { return (buffer.length() == 0) ? null : stream; } - private String sendEntityRequest(Entities entities, String entityType, + private APIResult sendEntityRequest(Entities entities, EntityType entityType, String entityName, String colo) throws FalconCLIException { WebResource resource = service.path(entities.path) - .path(entityType).path(entityName); + .path(entityType.toString().toLowerCase()).path(entityName); if (colo != null) { resource = resource.queryParam("colo", colo); } @@ -568,7 +572,8 @@ public class FalconClient { checkIfSuccessful(clientResponse); - return parseAPIResult(clientResponse); + // should be removed return parseAPIResult(clientResponse); + return clientResponse.getEntity(APIResult.class); } private WebResource addParamsToResource(WebResource resource, @@ -617,7 +622,7 @@ public class FalconClient { } - private String sendEntitySummaryRequest(Entities entities, String entityType, String cluster, + private EntitySummaryResult sendEntitySummaryRequest(Entities entities, String entityType, String cluster, String start, String end, String fields, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, @@ -638,11 +643,11 @@ public class FalconClient { .method(entities.method, ClientResponse.class); checkIfSuccessful(clientResponse); - return parseProcessEntitySummaryResult(clientResponse); + return clientResponse.getEntity(EntitySummaryResult.class); } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private String sendDefinitionRequest(Entities entities, String entityType, + private Entity sendDefinitionRequest(Entities entities, String entityType, String entityName) throws FalconCLIException { ClientResponse clientResponse = service @@ -652,7 +657,11 @@ public class FalconClient { .method(entities.method, ClientResponse.class); checkIfSuccessful(clientResponse); - return clientResponse.getEntity(String.class); + String entity = clientResponse.getEntity(String.class); + + return Entity.fromString(EntityType.valueOf(entityType.toUpperCase()), + entity); + } private EntityList sendDependencyRequest(Entities entities, String entityType, @@ -669,7 +678,7 @@ public class FalconClient { return parseEntityList(clientResponse); } - private String sendEntityRequestWithObject(Entities entities, String entityType, + private APIResult sendEntityRequestWithObject(Entities entities, String entityType, Object requestObject, String colo) throws FalconCLIException { WebResource resource = service.path(entities.path) .path(entityType); @@ -683,19 +692,21 @@ public class FalconClient { checkIfSuccessful(clientResponse); - return parseAPIResult(clientResponse); + //remove this return parseAPIResult(clientResponse); + return clientResponse.getEntity(APIResult.class); } //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck - private String sendInstanceRequest(Instances instances, String type, + private InstancesResult sendInstanceRequest(Instances instances, String type, String entity, String start, String end, InputStream props, String runid, String colo, List<LifeCycle> lifeCycles) throws FalconCLIException { return sendInstanceRequest(instances, type, entity, start, end, props, - runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS); + runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS) + .getEntity(InstancesResult.class); } - private String sendInstanceRequest(Instances instances, String type, String entity, + private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, String start, String end, InputStream props, String runid, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults) throws FalconCLIException { @@ -726,17 +737,7 @@ public class FalconClient { .method(instances.method, ClientResponse.class, props); } checkIfSuccessful(clientResponse); - - switch (instances) { - case LOG: - return parseProcessInstanceResultLogs(clientResponse, runid); - case SUMMARY: - return summarizeProcessInstanceResult(clientResponse); - case LISTING: - return parseFeedInstanceResult(clientResponse); - default: - return parseProcessInstanceResult(clientResponse); - } + return clientResponse; } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck @@ -790,7 +791,7 @@ public class FalconClient { .accept(job.mimeType) .type(job.mimeType) .method(job.method, ClientResponse.class); - return parseStringResult(clientResponse); + return clientResponse.getEntity(String.class); } private String sendMetadataDiscoveryRequest(final MetadataOperations operation, @@ -826,14 +827,13 @@ public class FalconClient { .method(operation.method, ClientResponse.class); checkIfSuccessful(clientResponse); - return parseStringResult(clientResponse); + return clientResponse.getEntity(String.class); } - private String parseAPIResult(ClientResponse clientResponse) + private APIResult parseAPIResult(ClientResponse clientResponse) throws FalconCLIException { - APIResult result = clientResponse.getEntity(APIResult.class); - return result.getMessage(); + return clientResponse.getEntity(APIResult.class); } private EntityList parseEntityList(ClientResponse clientResponse) @@ -847,211 +847,6 @@ public class FalconClient { } - private String parseStringResult(ClientResponse clientResponse) - throws FalconCLIException { - - return clientResponse.getEntity(String.class); - } - - private String parseProcessEntitySummaryResult(ClientResponse clientResponse) { - EntitySummaryResult result = clientResponse.getEntity(EntitySummaryResult.class); - StringBuilder sb = new StringBuilder(); - String toAppend; - sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); - sb.append("\nEntity Summary Result :\n"); - if (result.getEntitySummaries() != null) { - for (EntitySummaryResult.EntitySummary entitySummary : result.getEntitySummaries()) { - - toAppend = entitySummary.toString(); - sb.append(toAppend).append("\n"); - } - } - sb.append("\nAdditional Information:\n"); - sb.append("Response: ").append(result.getMessage()); - sb.append("Request Id: ").append(result.getRequestId()); - return sb.toString(); - } - - private String summarizeProcessInstanceResult(ClientResponse clientResponse) { - InstancesSummaryResult result = clientResponse - .getEntity(InstancesSummaryResult.class); - StringBuilder sb = new StringBuilder(); - String toAppend; - - sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); - sb.append("\nInstances Summary:\n"); - - if (result.getInstancesSummary() != null) { - for (InstancesSummaryResult.InstanceSummary summary : result.getInstancesSummary()) { - toAppend = summary.getCluster() != null ? summary.getCluster() : "-"; - sb.append("Cluster: ").append(toAppend).append("\n"); - - sb.append("Status\t\tCount\n"); - sb.append("-------------------------\n"); - - for (Map.Entry<String, Long> entry : summary.getSummaryMap().entrySet()) { - sb.append(entry.getKey()).append("\t\t").append(entry.getValue()).append("\n"); - } - } - } - - sb.append("\nAdditional Information:\n"); - sb.append("Response: ").append(result.getMessage()); - sb.append("Request Id: ").append(result.getRequestId()); - return sb.toString(); - } - - private String parseProcessInstanceResult(ClientResponse clientResponse) { - InstancesResult result = clientResponse - .getEntity(InstancesResult.class); - StringBuilder sb = new StringBuilder(); - String toAppend; - - sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); - - sb.append("\nInstances:\n"); - sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tStart\t\tEnd\t\tDetails\t\t\t\t\tLog\n"); - sb.append("-----------------------------------------------------------------------------------------------\n"); - if (result.getInstances() != null) { - for (InstancesResult.Instance instance : result.getInstances()) { - - toAppend = instance.getInstance() != null ? instance.getInstance() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getSourceCluster() != null ? instance.getSourceCluster() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = (instance.getStatus() != null ? instance.getStatus().toString() : "-"); - sb.append(toAppend).append("\t"); - - toAppend = instance.getStartTime() != null - ? SchemaHelper.formatDateUTC(instance.getStartTime()) : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getEndTime() != null - ? SchemaHelper.formatDateUTC(instance.getEndTime()) : "-"; - sb.append(toAppend).append("\t"); - - toAppend = (!StringUtils.isEmpty(instance.getDetails())) - ? instance.getDetails() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-"; - sb.append(toAppend).append("\n"); - - if (instance.getWfParams() != null) { - Map<String, String> props = instance.getWfParams(); - sb.append("Workflow params").append("\n"); - for (Map.Entry<String, String> entry : props.entrySet()) { - sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n"); - } - sb.append("\n"); - } - - if (instance.actions != null) { - sb.append("actions:\n"); - for (InstancesResult.InstanceAction action : instance.actions) { - sb.append(" ").append(action.getAction()).append("\t"); - sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n"); - } - } - } - } - sb.append("\nAdditional Information:\n"); - sb.append("Response: ").append(result.getMessage()); - sb.append("Request Id: ").append(result.getRequestId()); - return sb.toString(); - } - - private String parseProcessInstanceResultLogs(ClientResponse clientResponse, String runid) { - InstancesResult result = clientResponse - .getEntity(InstancesResult.class); - StringBuilder sb = new StringBuilder(); - String toAppend; - - sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); - - sb.append("\nInstances:\n"); - sb.append("Instance\t\tCluster\t\tSourceCluster\t\tStatus\t\tRunID\t\t\tLog\n"); - sb.append("-----------------------------------------------------------------------------------------------\n"); - if (result.getInstances() != null) { - for (InstancesResult.Instance instance : result.getInstances()) { - - toAppend = (instance.getInstance() != null) ? instance.getInstance() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getSourceCluster() != null ? instance.getSourceCluster() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = (instance.getStatus() != null ? instance.getStatus().toString() : "-"); - sb.append(toAppend).append("\t"); - - toAppend = (runid != null ? runid : "latest"); - sb.append(toAppend).append("\t"); - - toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-"; - sb.append(toAppend).append("\n"); - - if (instance.actions != null) { - sb.append("actions:\n"); - for (InstancesResult.InstanceAction action : instance.actions) { - sb.append(" ").append(action.getAction()).append("\t"); - sb.append(action.getStatus()).append("\t").append(action.getLogFile()).append("\n"); - } - } - } - } - sb.append("\nAdditional Information:\n"); - sb.append("Response: ").append(result.getMessage()); - sb.append("Request Id: ").append(result.getRequestId()); - return sb.toString(); - } - - private String parseFeedInstanceResult(ClientResponse clientResponse) { - FeedInstanceResult result = clientResponse.getEntity(FeedInstanceResult.class); - StringBuilder sb = new StringBuilder(); - String toAppend; - - sb.append("Consolidated Status: ").append(result.getStatus()).append("\n"); - - sb.append("\nInstances:\n"); - sb.append("Cluster\t\tInstance\t\tStatus\t\tSize\t\tCreationTime\t\tDetails\n"); - sb.append("-----------------------------------------------------------------------------------------------\n"); - if (result.getInstances() != null) { - for (FeedInstanceResult.Instance instance : result.getInstances()) { - - toAppend = instance.getCluster() != null ? instance.getCluster() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getInstance() != null ? instance.getInstance() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getStatus() != null ? instance.getStatus() : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getSize() != -1 ? String.valueOf(instance.getSize()) : "-"; - sb.append(toAppend).append("\t"); - - toAppend = instance.getCreationTime() != 0 - ? SchemaHelper.formatDateUTC(new Date(instance.getCreationTime())) : "-"; - sb.append(toAppend).append("\t"); - - toAppend = StringUtils.isEmpty(instance.getUri()) ? "-" : instance.getUri(); - sb.append(toAppend).append("\n"); - } - } - sb.append("\nAdditional Information:\n"); - sb.append("Response: ").append(result.getMessage()); - sb.append("Request Id: ").append(result.getRequestId()); - return sb.toString(); - } - public String getVertex(String id) throws FalconCLIException { return sendMetadataLineageRequest(MetadataOperations.VERTICES, id); } @@ -1068,7 +863,7 @@ public class FalconClient { return sendMetadataLineageRequest(MetadataOperations.EDGES, id); } - public String submitRecipe(String recipeName, + public APIResult submitRecipe(String recipeName, String recipeToolClassName) throws FalconCLIException { String recipePath = clientProperties.getProperty("falcon.recipe.path"); @@ -1128,7 +923,7 @@ public class FalconClient { .accept(job.mimeType) .type(job.mimeType) .method(job.method, ClientResponse.class); - return parseStringResult(clientResponse); + return clientResponse.getEntity(String.class); } private String sendMetadataLineageRequest(MetadataOperations job, String key, @@ -1140,7 +935,7 @@ public class FalconClient { .accept(job.mimeType) .type(job.mimeType) .method(job.method, ClientResponse.class); - return parseStringResult(clientResponse); + return clientResponse.getEntity(String.class); } private String sendMetadataLineageRequestForEdges(MetadataOperations job, String id, @@ -1152,7 +947,7 @@ public class FalconClient { .accept(job.mimeType) .type(job.mimeType) .method(job.method, ClientResponse.class); - return parseStringResult(clientResponse); + return clientResponse.getEntity(String.class); } private void checkIfSuccessful(ClientResponse clientResponse) throws FalconCLIException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0d608aa7/webapp/src/main/java/org/apache/falcon/Debug.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/Debug.java b/webapp/src/main/java/org/apache/falcon/Debug.java index a650215..c606074 100644 --- a/webapp/src/main/java/org/apache/falcon/Debug.java +++ b/webapp/src/main/java/org/apache/falcon/Debug.java @@ -67,11 +67,14 @@ public final class Debug { if (ConfigurationStore.get().get(eType, dep.name) != null) { continue; } - String xml = client.getDefinition(eType.name().toLowerCase(), dep.name); + String xml = + client.getDefinition(eType.name().toLowerCase(), dep.name) + .toString(); System.out.println(xml); store(eType, xml); } - String xml = client.getDefinition(type.toLowerCase(), entity); + String xml = + client.getDefinition(type.toLowerCase(), entity).toString(); System.out.println(xml); store(EntityType.valueOf(type.toUpperCase()), xml); }