Repository: falcon Updated Branches: refs/heads/0.9 14e209da9 -> 8dee7c9ea
FALCON-1707 Code Refactoring for Falcon Client. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8dee7c9e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8dee7c9e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8dee7c9e Branch: refs/heads/0.9 Commit: 8dee7c9eac939530d26418e8ddc2ac82a7c1d8b5 Parents: 14e209d Author: Ajay Yadava <[email protected]> Authored: Tue Jan 19 20:05:32 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Jan 19 20:55:43 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/client/FalconCLIException.java | 2 +- .../org/apache/falcon/client/FalconClient.java | 915 +++++++------------ 3 files changed, 328 insertions(+), 591 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8dee7c9e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a200b9c..532b867 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,8 @@ Proposed Release Version: 0.9 FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1707 Code Refactoring for Falcon Client(Ajay Yadava) + FALCON-1733 Support for building falcon with JDK 1.8 also(Narayan Periwal via Ajay Yadava) FALCON-1662 Ensure entity can be scheduled on multiple clusters on same colo (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/8dee7c9e/client/src/main/java/org/apache/falcon/client/FalconCLIException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java index 831f7ac..e7dfa52 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java +++ b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java @@ -77,7 +77,7 @@ public class FalconCLIException extends Exception { return statusValue + ";" + message; } - public static FalconCLIException fromReponse(ClientResponse clientResponse, Class clazz) { + public static FalconCLIException fromReponse(ClientResponse clientResponse, Class<? extends APIResult> clazz) { return new FalconCLIException(getMessage(clientResponse, clazz)); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8dee7c9e/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 16ece7a..46b32dd 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -18,11 +18,32 @@ package org.apache.falcon.client; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HTTPSProperties; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Method; +import java.net.URL; +import java.security.SecureRandom; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.util.TrustManagerUtils; @@ -50,31 +71,11 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Method; -import java.net.URL; -import java.security.SecureRandom; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HTTPSProperties; /** * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs @@ -82,30 +83,61 @@ import java.util.concurrent.atomic.AtomicReference; */ public class FalconClient extends AbstractFalconClient { - public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out); + public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out); public static final String WS_HEADER_PREFIX = "header:"; public static final String USER = System.getProperty("user.name"); public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER; + + + public static final String PATH = "path"; + public static final String COLO = "colo"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + public static final String CLUSTER = "cluster"; + public static final String RUN_ID = "runid"; + public static final String FORCE = "force"; + public static final String SHOW_SCHEDULER = "showScheduler"; + public static final String ENTITY_NAME = "name"; + public static final String SKIP_DRYRUN = "skipDryRun"; + public static final String FILTER_BY = "filterBy"; + public static final String ORDER_BY = "orderBy"; + public static final String SORT_ORDER = "sortOrder"; + public static final String OFFSET = "offset"; + public static final String NUM_RESULTS = "numResults"; + public static final String START = "start"; + public static final String END = "end"; + public static final String INSTANCE_TIME = "instanceTime"; + public static final String PROPERTIES = "properties"; + private static final String FIELDS = "fields"; + private static final String NAME_SUBSEQUENCE = "nameseq"; + private static final String FILTER_TAGS = "tags"; + private static final String TAG_KEYWORDS = "tagkeys"; + private static final String LIFECYCLE = "lifecycle"; + private static final String NUM_INSTANCES = "numInstances"; + + + + + public static final String DO_AS_OPT = "doAs"; /** * Name of the HTTP cookie used for the authentication token between the client and the server. */ public static final String AUTH_COOKIE = "hadoop.auth"; private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "="; - private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator(); + private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator(); private static final String TEMPLATE_SUFFIX = "-template.xml"; - private static final String PROPERTIES_SUFFIX = ".properties"; + private static final String PROPERTIES_SUFFIX = ".properties"; public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() { @Override public boolean verify(String hostname, SSLSession sslSession) { return true; } }; - private final WebResource service; private final AuthenticatedURL.Token authenticationToken; @@ -182,10 +214,6 @@ public class FalconClient extends AbstractFalconClient { this.debugMode = debugMode; } - public Properties getClientProperties() { - return clientProperties; - } - public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException { AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token(); try { @@ -210,7 +238,7 @@ public class FalconClient extends AbstractFalconClient { VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML), SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML), UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML), - SUBMITandSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML), + SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML), SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML), SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML), RESUME("api/entities/resume/", HttpMethod.POST, MediaType.TEXT_XML), @@ -314,183 +342,181 @@ public class FalconClient extends AbstractFalconClient { } public APIResult schedule(EntityType entityType, String entityName, String colo, - Boolean skipDryRun, String doAsUser, String properties) - throws FalconCLIException { - - return sendEntityRequest(Entities.SCHEDULE, entityType, entityName, - colo, skipDryRun, doAsUser, properties); - + Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { + String type = entityType.toString().toLowerCase(); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SCHEDULE.path, type, entityName) + .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun) + .addQueryParam(PROPERTIES, properties).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SCHEDULE); + return getResponse(APIResult.class, clientResponse); } public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException { - - return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser, null); - + String type = entityType.toString().toLowerCase(); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUSPEND.path, type, entityName) + .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUSPEND); + return getResponse(APIResult.class, clientResponse); } public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws FalconCLIException { - - return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser, null); - + String type = entityType.toString().toLowerCase(); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.RESUME.path, type, entityName) + .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.RESUME); + return getResponse(APIResult.class, clientResponse); } - public APIResult delete(EntityType entityType, String entityName, String doAsUser) - throws FalconCLIException { - - return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser, null); - + public APIResult delete(EntityType entityType, String entityName, String doAsUser) throws FalconCLIException { + String type = entityType.toString().toLowerCase(); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.DELETE.path, type, entityName) + .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DELETE); + return getResponse(APIResult.class, clientResponse); } public APIResult validate(String entityType, String filePath, Boolean skipDryRun, String doAsUser) throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - return sendEntityRequestWithObject(Entities.VALIDATE, entityType, - entityStream, null, skipDryRun, doAsUser, null); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.VALIDATE.path, entityType) + .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) + .call(Entities.VALIDATE, entityStream); + return getResponse(APIResult.class, clientResponse); } public APIResult submit(String entityType, String filePath, String doAsUser) throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - return sendEntityRequestWithObject(Entities.SUBMIT, entityType, - entityStream, null, null, doAsUser, null); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMIT.path, entityType) + .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUBMIT, entityStream); + return getResponse(APIResult.class, clientResponse); } public APIResult update(String entityType, String entityName, String filePath, - Boolean skipDryRun, String doAsUser) - throws FalconCLIException { + Boolean skipDryRun, String doAsUser) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); Entities operation = Entities.UPDATE; - WebResource resource = service.path(operation.path).path(entityType).path(entityName); - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); - } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class, entityStream); - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return parseAPIResult(clientResponse); + ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName) + .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) + .call(operation, entityStream); + return getResponse(APIResult.class, clientResponse); } @Override - public APIResult submitAndSchedule(String entityType, String filePath, - Boolean skipDryRun, String doAsUser, String properties) - throws FalconCLIException { - + public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, + String doAsUser, String properties) throws FalconCLIException { InputStream entityStream = getServletInputStream(filePath); - return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE, - entityType, entityStream, null, skipDryRun, doAsUser, properties); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMITANDSCHEDULE.path, entityType) + .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(PROPERTIES, properties).call(Entities.SUBMITANDSCHEDULE, entityStream); + return getResponse(APIResult.class, clientResponse); } public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser, boolean showScheduler) throws FalconCLIException { + String type = entityType.toString().toLowerCase(); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.STATUS.path, type, entityName) + .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(SHOW_SCHEDULER, showScheduler).call(Entities.STATUS); - return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser, null, showScheduler); + return getResponse(APIResult.class, clientResponse); } - public Entity getDefinition(String entityType, String entityName, String doAsUser) - throws FalconCLIException { - - return sendDefinitionRequest(Entities.DEFINITION, entityType, - entityName, doAsUser); + public Entity getDefinition(String entityType, String entityName, String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEFINITION.path, entityType, entityName) + .call(Entities.DEFINITION); + String entity = getResponseAsString(clientResponse); + return Entity.fromString(EntityType.getEnum(entityType), entity); } public EntityList getDependency(String entityType, String entityName, String doAsUser) throws FalconCLIException { - return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName, doAsUser); + + ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEPENDENCY.path, entityType, entityName) + .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DEPENDENCY); + + printClientResponse(clientResponse); + checkIfSuccessful(clientResponse); + + EntityList result = clientResponse.getEntity(EntityList.class); + if (result == null || result.getElements() == null) { + return null; + } + return result; } //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName, String startTime, String endTime, String colo) throws FalconCLIException { - - WebResource resource = service.path(Entities.SLA.path).path(entityType).queryParam("start", startTime) - .queryParam("colo", colo); - if (endTime != null) { - resource = resource.queryParam("end", endTime); - } - if (entityName != null) { - resource = resource.queryParam("name", entityName); - } - ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(Entities.SLA.mimeType).type(MediaType.APPLICATION_JSON) - .method(Entities.SLA.method, ClientResponse.class); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(SchedulableEntityInstanceResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SLA.path, entityType) + .addQueryParam(START, startTime).addQueryParam(COLO, colo).addQueryParam(END, endTime) + .addQueryParam(ENTITY_NAME, entityName).call(Entities.SLA); + return getResponse(SchedulableEntityInstanceResult.class, clientResponse); } - public TriageResult triage(String entityType, String entityName, String instanceTime, String colo) - throws FalconCLIException { - WebResource resource = service - .path(Instances.TRIAGE.path).path(entityType).path(entityName) - .queryParam("start", instanceTime).queryParam("colo", colo); - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(Instances.TRIAGE.mimeType).type(MediaType.TEXT_XML) - .method(Instances.TRIAGE.method, ClientResponse.class); - - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(TriageResult.class); + public TriageResult triage(String entityType, String entityName, String instanceTime, + String colo) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder().path(Instances.TRIAGE.path, entityType, entityName) + .addQueryParam(START, instanceTime).addQueryParam(COLO, colo).call(Instances.TRIAGE); + return getResponse(TriageResult.class, clientResponse); } @Override public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - return sendListRequest(Entities.LIST, entityType, fields, nameSubsequence, tagKeywords, filterBy, - filterTags, orderBy, sortOrder, offset, numResults, doAsUser); + Entities operation = Entities.LIST; + ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType) + .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(NUM_RESULTS, numResults) + .addQueryParam(OFFSET, offset).addQueryParam(SORT_ORDER, sortOrder) + .addQueryParam(ORDER_BY, orderBy).addQueryParam(FILTER_BY, filterBy) + .addQueryParam(FIELDS, fields).addQueryParam(NAME_SUBSEQUENCE, nameSubsequence) + .addQueryParam(TAG_KEYWORDS, tagKeywords).addQueryParam(FILTER_TAGS, filterTags) + .call(operation); + + printClientResponse(clientResponse); + checkIfSuccessful(clientResponse); + + EntityList result = clientResponse.getEntity(EntityList.class); + if (result == null || result.getElements() == null) { + return null; + } + return result; } @Override public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, String fields, String filterBy, String filterTags, - String orderBy, String sortOrder, - Integer offset, Integer numResults, Integer numInstances, String doAsUser) - throws FalconCLIException { - return sendEntitySummaryRequest(Entities.SUMMARY, entityType, cluster, start, end, fields, filterBy, filterTags, - orderBy, sortOrder, offset, numResults, numInstances, doAsUser); + String orderBy, String sortOrder, Integer offset, Integer numResults, + Integer numInstances, String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUMMARY.path, entityType) + .addQueryParam(CLUSTER, cluster).addQueryParam(START, start).addQueryParam(END, end) + .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(ORDER_BY, orderBy) + .addQueryParam(OFFSET, offset).addQueryParam(NUM_RESULTS, numResults) + .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(FILTER_BY, filterBy) + .addQueryParam(NUM_INSTANCES, numInstances).addQueryParam(FIELDS, fields) + .addQueryParam(FILTER_TAGS, filterTags).call(Entities.SUMMARY); + return getResponse(EntitySummaryResult.class, clientResponse); } @Override public APIResult touch(String entityType, String entityName, String colo, Boolean skipDryRun, String doAsUser) throws FalconCLIException { Entities operation = Entities.TOUCH; - WebResource resource = service.path(operation.path).path(entityType).path(entityName); - if (colo != null) { - resource = resource.queryParam("colo", colo); - } - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); - } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class); - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return parseAPIResult(clientResponse); + ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName) + .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun) + .addQueryParam(DO_AS_OPT, doAsUser).call(operation); + return getResponse(APIResult.class, clientResponse); } public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - - return sendInstanceRequest(Instances.RUNNING, type, entity, null, null, - null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser) - .getEntity(InstancesResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.RUNNING.path, type, entity) + .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy) + .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset) + .addQueryParam(NUM_RESULTS, numResults).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RUNNING); + return getResponse(InstancesResult.class, clientResponse); } public InstancesResult getStatusOfInstances(String type, String entity, @@ -498,10 +524,13 @@ public class FalconClient extends AbstractFalconClient { String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - - return sendInstanceRequest(Instances.STATUS, type, entity, start, end, - null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser) - .getEntity(InstancesResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.RUNNING.path, type, entity) + .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy) + .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset) + .addQueryParam(START, start).addQueryParam(END, end) + .addQueryParam(NUM_RESULTS, numResults).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RUNNING); + return getResponse(InstancesResult.class, clientResponse); } public InstancesSummaryResult getSummaryOfInstances(String type, String entity, @@ -509,48 +538,47 @@ public class FalconClient extends AbstractFalconClient { String colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, String doAsUser) throws FalconCLIException { - - return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, null, - null, colo, lifeCycles, filterBy, orderBy, sortOrder, 0, null, doAsUser) - .getEntity(InstancesSummaryResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUMMARY.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUMMARY); + return getResponse(InstancesSummaryResult.class, clientResponse); } public FeedInstanceResult getFeedListing(String type, String entity, String start, - String end, String colo, String doAsUser) - throws FalconCLIException { - - return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null, - null, colo, null, "", "", "", 0, null, doAsUser).getEntity(FeedInstanceResult.class); + String end, String colo, String doAsUser) throws FalconCLIException { + ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(USER, doAsUser).call(Instances.LISTING); + return getResponse(FeedInstanceResult.class, clientResponse); } public InstancesResult killInstances(String type, String entity, String start, String end, String colo, String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) - throws FalconCLIException, UnsupportedEncodingException { - - return sendInstanceRequest(Instances.KILL, type, entity, start, end, - getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser); + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + InputStream props = getServletInputStream(clusters, sourceClusters, null); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.KILL, props); + return getResponse(InstancesResult.class, clientResponse); } - public InstancesResult suspendInstances(String type, String entity, String start, - String end, String colo, String clusters, - String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) - throws FalconCLIException, UnsupportedEncodingException { - - return sendInstanceRequest(Instances.SUSPEND, type, entity, start, end, - getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser); + public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUSPEND.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUSPEND); + return getResponse(InstancesResult.class, clientResponse); } - public InstancesResult resumeInstances(String type, String entity, String start, - String end, String colo, String clusters, - String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) - throws FalconCLIException, UnsupportedEncodingException { - - return sendInstanceRequest(Instances.RESUME, type, entity, start, end, - getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser); + public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, + String clusters, String sourceClusters, List<LifeCycle> lifeCycles, + String doAsUser) throws FalconCLIException, UnsupportedEncodingException { + ClientResponse clientResponse = new ResourceBuilder().path(Instances.RESUME.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RESUME); + return getResponse(InstancesResult.class, clientResponse); } public InstancesResult rerunInstances(String type, String entity, String start, @@ -574,8 +602,12 @@ public class FalconClient extends AbstractFalconClient { } } String temp = (buffer.length() == 0) ? null : buffer.toString(); - return sendInstanceRequest(Instances.RERUN, type, entity, start, end, - getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced, doAsUser); + InputStream props = getServletInputStream(clusters, sourceClusters, temp); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.RERUN.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FORCE, isForced) + .addQueryParam(USER, doAsUser).call(Instances.RERUN, props); + return getResponse(InstancesResult.class, clientResponse); } public InstancesResult getLogsOfInstances(String type, String entity, String start, @@ -583,9 +615,13 @@ public class FalconClient extends AbstractFalconClient { List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - return sendInstanceRequest(Instances.LOG, type, entity, start, end, - null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser) - .getEntity(InstancesResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(Instances.LOG.path, type, entity) + .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) + .addQueryParam(RUN_ID, runId).addQueryParam(LIFECYCLE, lifeCycles, type) + .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy) + .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset) + .addQueryParam(NUM_RESULTS, numResults).addQueryParam(USER, doAsUser).call(Instances.LOG); + return getResponse(InstancesResult.class, clientResponse); } public InstancesResult getParamsOfInstance(String type, String entity, @@ -597,8 +633,11 @@ public class FalconClient extends AbstractFalconClient { throw new FalconCLIException("Start date is mandatory and should be" + " a valid date in YYYY-MM-DDTHH:MMZ format."); } - return sendInstanceRequest(Instances.PARAMS, type, entity, - start, null, null, null, colo, lifeCycles, doAsUser); + + ClientResponse clientResponse = new ResourceBuilder().path(Instances.PARAMS.path, type, entity) + .addQueryParam(START, start).addQueryParam(LIFECYCLE, lifeCycles, type) + .addQueryParam(USER, doAsUser).call(Instances.PARAMS); + return getResponse(InstancesResult.class, clientResponse); } public String getThreadDump(String doAsUser) throws FalconCLIException { @@ -612,17 +651,8 @@ public class FalconClient extends AbstractFalconClient { public int getStatus(String doAsUser) throws FalconCLIException { AdminOperations job = AdminOperations.VERSION; - - WebResource resource = service.path(job.path); - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(job.mimeType).type(MediaType.TEXT_PLAIN) - .method(job.method, ClientResponse.class); + ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) + .call(job); printClientResponse(clientResponse); return clientResponse.getStatus(); } @@ -639,18 +669,8 @@ public class FalconClient extends AbstractFalconClient { public LineageGraphResult getEntityLineageGraph(String pipelineName, String doAsUser) throws FalconCLIException { MetadataOperations operation = MetadataOperations.LINEAGE; - - WebResource resource = service.path(operation.path) - .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName); - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(operation.mimeType) - .method(operation.method, ClientResponse.class); + ClientResponse clientResponse = new ResourceBuilder().path(operation.path).addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName).call(operation); printClientResponse(clientResponse); checkIfSuccessful(clientResponse); return clientResponse.getEntity(LineageGraphResult.class); @@ -683,314 +703,119 @@ public class FalconClient extends AbstractFalconClient { return stream; } - private APIResult sendEntityRequest(Entities entities, EntityType entityType, String entityName, - String colo, Boolean skipDryRun, String doAsUser, String properties, - boolean showScheduler) throws FalconCLIException { - WebResource resource = service.path(entities.path) - .path(entityType.toString().toLowerCase()).path(entityName); - if (colo != null) { - resource = resource.queryParam("colo", colo); - } - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); - } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - if (StringUtils.isNotEmpty(properties)) { - resource = resource.queryParam("properties", properties); - } - - resource = resource.queryParam("showScheduler", Boolean.toString(showScheduler)); - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class); - + private <T extends APIResult> T getResponse(Class<T> clazz, + ClientResponse clientResponse) throws FalconCLIException { printClientResponse(clientResponse); + checkIfSuccessful(clientResponse, clazz); + return clientResponse.getEntity(clazz); + } + private String getResponseAsString(ClientResponse clientResponse) throws FalconCLIException { + printClientResponse(clientResponse); checkIfSuccessful(clientResponse); - - // should be removed return parseAPIResult(clientResponse); - return clientResponse.getEntity(APIResult.class); + return clientResponse.getEntity(String.class); } - private APIResult sendEntityRequest(Entities entities, EntityType entityType, - String entityName, String colo, Boolean skipDryRun, - String doAsUser, String properties) throws FalconCLIException { - return sendEntityRequest(entities, entityType, entityName, colo, skipDryRun, doAsUser, properties, false); - } + private class ResourceBuilder { + WebResource resource; - private WebResource addParamsToResource(WebResource resource, - String start, String end, String runId, String colo, - String fields, String nameSubsequence, String tagKeywords, String filterBy, - String tags, String orderBy, String sortOrder, Integer offset, - Integer numResults, Integer numInstances, Boolean isForced, - String doAsUser) { - if (StringUtils.isNotEmpty(fields)) { - resource = resource.queryParam("fields", fields); - } - if (StringUtils.isNotEmpty(nameSubsequence)) { - resource = resource.queryParam("nameseq", nameSubsequence); - } - if (StringUtils.isNotEmpty(tagKeywords)) { - resource = resource.queryParam("tagkeys", tagKeywords); - } - if (StringUtils.isNotEmpty(tags)) { - resource = resource.queryParam("tags", tags); - } - if (StringUtils.isNotEmpty(filterBy)) { - resource = resource.queryParam("filterBy", filterBy); - } - if (StringUtils.isNotEmpty(orderBy)) { - resource = resource.queryParam("orderBy", orderBy); - } - if (StringUtils.isNotEmpty(sortOrder)) { - resource = resource.queryParam("sortOrder", sortOrder); - } - if (StringUtils.isNotEmpty(start)) { - resource = resource.queryParam("start", start); - } - if (StringUtils.isNotEmpty(end)) { - resource = resource.queryParam("end", end); - } - if (runId != null) { - resource = resource.queryParam("runid", runId); - } - if (colo != null) { - resource = resource.queryParam("colo", colo); - } - if (offset != null) { - resource = resource.queryParam("offset", offset.toString()); - } - if (numResults != null) { - resource = resource.queryParam("numResults", numResults.toString()); - } - if (numInstances != null) { - resource = resource.queryParam("numInstances", numInstances.toString()); - } - if (isForced != null) { - resource = resource.queryParam("force", String.valueOf(isForced)); + private ResourceBuilder path(String... paths) { + for (String path : paths) { + if (resource == null) { + resource = service.path(path); + } else { + resource = resource.path(path); + } + } + return this; } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); + public ResourceBuilder addQueryParam(String paramName, Integer value) { + if (value != null) { + resource = resource.queryParam(paramName, value.toString()); + } + return this; } - return resource; - } - - private EntitySummaryResult sendEntitySummaryRequest(Entities entities, String entityType, String cluster, - String start, String end, - String fields, String filterBy, String filterTags, - String orderBy, String sortOrder, Integer offset, Integer numResults, - Integer numInstances, String doAsUser) throws FalconCLIException { - WebResource resource = service.path(entities.path).path(entityType); - if (StringUtils.isNotEmpty(cluster)) { - resource = resource.queryParam("cluster", cluster); + public ResourceBuilder addQueryParam(String paramName, Boolean paramValue) { + if (paramValue != null) { + resource = resource.queryParam(paramName, String.valueOf(paramValue)); + } + return this; } - resource = addParamsToResource(resource, start, end, null, null, - fields, null, null, filterBy, filterTags, - orderBy, sortOrder, - offset, numResults, numInstances, null, doAsUser); - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(EntitySummaryResult.class); - } - //RESUME CHECKSTYLE CHECK ParameterNumberCheck - - private Entity sendDefinitionRequest(Entities entities, String entityType, - String entityName, String doAsUser) throws FalconCLIException { + public ResourceBuilder addQueryParam(String paramName, String paramValue) { + if (StringUtils.isNotBlank(paramValue)) { + resource = resource.queryParam(paramName, paramValue); + } + return this; + } - WebResource resource = service.path(entities.path).path(entityType).path(entityName); - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); + public ResourceBuilder addQueryParam(String paramName, List<LifeCycle> lifeCycles, + String type) throws FalconCLIException { + if (lifeCycles != null) { + checkLifeCycleOption(lifeCycles, type); + for (LifeCycle lifeCycle : lifeCycles) { + resource = resource.queryParam(paramName, lifeCycle.toString()); + } + } + return this; } - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + private ClientResponse call(Entities entities) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) .accept(entities.mimeType).type(MediaType.TEXT_XML) .method(entities.method, ClientResponse.class); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - String entity = clientResponse.getEntity(String.class); - - return Entity.fromString(EntityType.getEnum(entityType), entity); - - } - - private EntityList sendDependencyRequest(Entities entities, String entityType, - String entityName, String doAsUser) throws FalconCLIException { - - WebResource resource = service.path(entities.path).path(entityType).path(entityName); - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); } - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class); - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - - return parseEntityList(clientResponse); - } - - private APIResult sendEntityRequestWithObject(Entities entities, String entityType, - Object requestObject, String colo, Boolean skipDryRun, - String doAsUser, String properties) throws FalconCLIException { - WebResource resource = service.path(entities.path) - .path(entityType); - if (colo != null) { - resource = resource.queryParam("colo", colo); + public ClientResponse call(AdminOperations operation) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class); } - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); + private ClientResponse call(MetadataOperations metadataOperations) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(metadataOperations.mimeType).type(MediaType.TEXT_XML) + .method(metadataOperations.method, ClientResponse.class); } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); + public ClientResponse call(Instances operation) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class); } - if (StringUtils.isNotEmpty(properties)) { - resource = resource.queryParam("properties", properties); + public ClientResponse call(Entities operation, InputStream entityStream) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class, entityStream); } - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class, requestObject); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - - //remove this return parseAPIResult(clientResponse); - return clientResponse.getEntity(APIResult.class); + public ClientResponse call(Instances operation, InputStream entityStream) { + return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(MediaType.TEXT_XML) + .method(operation.method, ClientResponse.class, entityStream); + } } public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException { Entities api = Entities.LOOKUP; - WebResource resource = service.path(api.path).path(type); - resource = resource.queryParam("path", path); - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(api.mimeType) - .method(api.method, ClientResponse.class); - printClientResponse(response); - checkIfSuccessful(response); - return response.getEntity(FeedLookupResult.class); - } - //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck - private InstancesResult sendInstanceRequest(Instances instances, String type, - String entity, String start, String end, InputStream props, - String runid, String colo, - List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException { - return sendInstanceRequest(instances, type, entity, start, end, props, - runid, colo, lifeCycles, "", "", "", 0, null, doAsUser) - .getEntity(InstancesResult.class); + ClientResponse response = new ResourceBuilder().path(api.path, type).addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(PATH, path).call(api); + return getResponse(FeedLookupResult.class, response); } - private InstancesResult sendInstanceRequest(Instances instances, String type, - String entity, String start, String end, InputStream props, - String runid, String colo, List<LifeCycle> lifeCycles, - Boolean isForced, String doAsUser) throws FalconCLIException { - return sendInstanceRequest(instances, type, entity, start, end, props, - runid, colo, lifeCycles, "", "", "", 0, null, isForced, doAsUser).getEntity(InstancesResult.class); - } - - - - private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, - String start, String end, InputStream props, String runid, String colo, - List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, - Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - - return sendInstanceRequest(instances, type, entity, start, end, props, - runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null, doAsUser); - } - - private ClientResponse sendInstanceRequest(Instances instances, String type, String entity, - String start, String end, InputStream props, String runid, String colo, - List<LifeCycle> lifeCycles, String filterBy, String orderBy, - String sortOrder, Integer offset, Integer numResults, Boolean isForced, - String doAsUser) throws FalconCLIException { - checkType(type); - WebResource resource = service.path(instances.path).path(type) - .path(entity); - - resource = addParamsToResource(resource, start, end, runid, colo, - null, null, null, filterBy, null, orderBy, sortOrder, offset, numResults, null, isForced, doAsUser); - - if (lifeCycles != null) { - checkLifeCycleOption(lifeCycles, type); - for (LifeCycle lifeCycle : lifeCycles) { - resource = resource.queryParam("lifecycle", lifeCycle.toString()); - } - } - - ClientResponse clientResponse; - if (props == null) { - clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(instances.mimeType) - .method(instances.method, ClientResponse.class); - } else { - clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(instances.mimeType) - .method(instances.method, ClientResponse.class, props); - } - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse; - } - - public FeedInstanceResult getFeedInstanceListing(String type, String entity, String start, String end, String colo , String doAsUser) throws FalconCLIException { checkType(type); Instances api = Instances.LISTING; - WebResource resource = service.path(api.path).path(type).path(entity); - if (colo != null) { - resource = resource.queryParam("colo", colo); - } - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - if (StringUtils.isNotEmpty(start)){ - resource = resource.queryParam("start", start); - } - if (StringUtils.isNotEmpty(end)) { - resource = resource.queryParam("end", end); - } - - ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(api.mimeType) - .method(api.method, ClientResponse.class); - - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse, FeedInstanceResult.class); - return clientResponse.getEntity(FeedInstanceResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(api.path, type, entity) + .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(START, start) + .addQueryParam(END, end).call(api); + return getResponse(FeedInstanceResult.class, clientResponse); } @@ -998,17 +823,9 @@ public class FalconClient extends AbstractFalconClient { String colo) throws FalconCLIException { checkType(entityType); Instances api = Instances.DEPENDENCY; - - WebResource resource = service.path(api.path).path(entityType).path(entityName); - resource = resource.queryParam("instanceTime", instanceTime); - resource = resource.queryParam("colo", colo); - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(api.mimeType) - .method(api.method, ClientResponse.class); - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(InstanceDependencyResult.class); + ClientResponse clientResponse = new ResourceBuilder().path(api.path, entityType, entityName) + .addQueryParam(COLO, colo).addQueryParam(INSTANCE_TIME, instanceTime).call(api); + return getResponse(InstanceDependencyResult.class, clientResponse); } //RESUME CHECKSTYLE CHECK VisibilityModifierCheck @@ -1036,43 +853,10 @@ public class FalconClient extends AbstractFalconClient { } } - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - private EntityList sendListRequest(Entities entities, String entityType, String fields, String nameSubsequence, - String tagKeywords, String filterBy, String filterTags, String orderBy, - String sortOrder, Integer offset, Integer numResults, String doAsUser - ) throws FalconCLIException { - WebResource resource = service.path(entities.path) - .path(entityType); - resource = addParamsToResource(resource, null, null, null, null, fields, nameSubsequence, tagKeywords, - filterBy, filterTags, orderBy, sortOrder, offset, numResults, null, null, doAsUser); - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - - return parseEntityList(clientResponse); - } - // RESUME CHECKSTYLE CHECK ParameterNumberCheck - private String sendAdminRequest(AdminOperations job, String doAsUser) throws FalconCLIException { - WebResource resource = service.path(job.path); - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(job.mimeType) - .type(job.mimeType) - .method(job.method, ClientResponse.class); - printClientResponse(clientResponse); - return clientResponse.getEntity(String.class); + ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) + .call(job); + return getResponseAsString(clientResponse); } private String sendRequestForReplicationMetrics(final MetadataOperations operation, final String schedEntityType, @@ -1150,22 +934,6 @@ public class FalconClient extends AbstractFalconClient { return clientResponse.getEntity(String.class); } - private APIResult parseAPIResult(ClientResponse clientResponse) - throws FalconCLIException { - - return clientResponse.getEntity(APIResult.class); - } - - private EntityList parseEntityList(ClientResponse clientResponse) - throws FalconCLIException { - - EntityList result = clientResponse.getEntity(EntityList.class); - if (result == null || result.getElements() == null) { - return null; - } - return result; - - } public String getVertex(String id, String doAsUser) throws FalconCLIException { return sendMetadataLineageRequest(MetadataOperations.VERTICES, id, doAsUser); @@ -1197,11 +965,8 @@ public class FalconClient extends AbstractFalconClient { return recipePath; } - public APIResult submitRecipe(String recipeName, - String recipeToolClassName, - final String recipeOperation, - String recipePropertiesFile, - Boolean skipDryRun, + public APIResult submitRecipe(String recipeName, String recipeToolClassName, + final String recipeOperation, String recipePropertiesFile, Boolean skipDryRun, final String doAsUser) throws FalconCLIException { String recipePath = getRecipePath(recipePropertiesFile); @@ -1258,54 +1023,25 @@ public class FalconClient extends AbstractFalconClient { private String sendMetadataLineageRequest(MetadataOperations job, String id, String doAsUser) throws FalconCLIException { - WebResource resource = service.path(job.path).path(id); - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(job.mimeType) - .type(job.mimeType) - .method(job.method, ClientResponse.class); - printClientResponse(clientResponse); - return clientResponse.getEntity(String.class); + ClientResponse clientResponse = new ResourceBuilder().path(job.path, id).addQueryParam(DO_AS_OPT, doAsUser) + .call(job); + return getResponseAsString(clientResponse); } private String sendMetadataLineageRequest(MetadataOperations job, String key, String value, String doAsUser) throws FalconCLIException { - WebResource resource = service.path(job.path); - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - ClientResponse clientResponse = resource.queryParam("key", key) - .queryParam("value", value) - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(job.mimeType) - .type(job.mimeType) - .method(job.method, ClientResponse.class); - printClientResponse(clientResponse); - return clientResponse.getEntity(String.class); + ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) + .addQueryParam(KEY, key).addQueryParam(VALUE, value).call(job); + return getResponseAsString(clientResponse); } private String sendMetadataLineageRequestForEdges(MetadataOperations job, String id, String direction, String doAsUser) throws FalconCLIException { - WebResource resource = service.path(job.path) - .path(id).path(direction); - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(job.mimeType) - .type(job.mimeType) - .method(job.method, ClientResponse.class); - printClientResponse(clientResponse); - return clientResponse.getEntity(String.class); + ClientResponse clientResponse = new ResourceBuilder().path(job.path, id, direction) + .addQueryParam(DO_AS_OPT, doAsUser).call(job); + return getResponseAsString(clientResponse); } + /* * Donot use this getMessage use the overloaded one * with clazz as param for better error handling @@ -1318,15 +1054,14 @@ public class FalconClient extends AbstractFalconClient { } } - private void checkIfSuccessful(ClientResponse clientResponse, Class clazz) throws FalconCLIException { + private void checkIfSuccessful(ClientResponse clientResponse, + Class<? extends APIResult> clazz) throws FalconCLIException { Response.Status.Family statusFamily = clientResponse.getClientResponseStatus().getFamily(); - if (statusFamily != Response.Status.Family.SUCCESSFUL - && statusFamily != Response.Status.Family.INFORMATIONAL) { + if (statusFamily != Response.Status.Family.SUCCESSFUL && statusFamily != Response.Status.Family.INFORMATIONAL) { throw FalconCLIException.fromReponse(clientResponse, clazz); } } - private void printClientResponse(ClientResponse clientResponse) { if (getDebugMode()) { OUT.get().println(clientResponse.toString());
