http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java deleted file mode 100644 index 4f86d9b..0000000 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ /dev/null @@ -1,466 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.client; - -import org.apache.falcon.LifeCycle; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.EntityList; -import org.apache.falcon.resource.EntitySummaryResult; -import org.apache.falcon.resource.FeedInstanceResult; -import org.apache.falcon.resource.InstanceDependencyResult; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesSummaryResult; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.util.List; - -/** - * Abstract Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs - * against an Falcon instance. - */ -public abstract class AbstractFalconClient { - - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - - protected static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; - protected static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters"; - - /** - * Submit a new entity. Entities can be of type feed, process or data end - * points. Entity definitions are validated structurally against schema and - * subsequently for other rules before they are admitted into the system. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param filePath Path for the entity definition - * @return - * @throws FalconCLIException - */ - public abstract APIResult submit(String entityType, String filePath, String doAsUser) throws FalconCLIException, - IOException; - - /** - * Schedules an submitted process entity immediately. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param entityName Name of the entity. - * @param colo Cluster name. - * @return - * @throws FalconCLIException - */ - public abstract APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun, - String doAsuser, String properties) throws FalconCLIException; - - /** - * Delete the specified entity. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param entityName Name of the entity. - * @param doAsUser Proxy User. - * @return - * @throws FalconCLIException - */ - public abstract APIResult delete(EntityType entityType, String entityName, - String doAsUser) throws FalconCLIException; - - /** - * Validates the submitted entity. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param filePath Path for the entity definition to validate. - * @param skipDryRun Dry run. - * @param doAsUser Proxy User. - * @return - * @throws FalconCLIException - */ - public abstract APIResult validate(String entityType, String filePath, Boolean skipDryRun, - String doAsUser) throws FalconCLIException; - - /** - * Updates the submitted entity. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param entityName Name of the entity. - * @param filePath Path for the entity definition to update. - * @param skipDryRun Dry run. - * @param doAsUser Proxy User. - * @return - * @throws FalconCLIException - */ - public abstract APIResult update(String entityType, String entityName, String filePath, - Boolean skipDryRun, String doAsUser) throws FalconCLIException; - - /** - * Get definition of the entity. - * @param entityType Entity type. Valid options are cluster, feed or process. - * @param entityName Name of the entity. - * @param doAsUser Proxy user. - * @return - * @throws FalconCLIException - */ - public abstract Entity getDefinition(String entityType, String entityName, - String doAsUser) throws FalconCLIException; - - - - /** - * - * @param type entity type - * @param entity entity name - * @param start start time - * @param end end time - * @param colo colo name - * @param lifeCycles lifecycle of an entity (for ex : feed has replication,eviction). - * @param filterBy filter operation can be applied to results - * @param orderBy - * @param sortOrder sort order can be asc or desc - * @param offset offset while displaying results - * @param numResults num of Results to output - * @param doAsUser proxy user - * @param allAttempts To get the instances corresponding to each run-id - * @return - * @throws FalconCLIException - */ - public abstract InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String - colo, List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder, Integer offset, Integer - numResults, String doAsUser, Boolean allAttempts) throws FalconCLIException; - - /** - * Suspend an entity. - * @param entityType Valid options are feed or process. - * @param entityName Name of the entity. - * @param colo Colo on which the query should be run. - * @param doAsUser proxy user - * @return Status of the entity. - * @throws FalconCLIException - */ - public abstract APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws - FalconCLIException; - - /** - * Resume a supended entity. - * @param entityType Valid options are feed or process. - * @param entityName Name of the entity. - * @param colo Colo on which the query should be run. - * @param doAsUser proxy user - * @return Result of the resume command. - * @throws FalconCLIException - */ - public abstract APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws - FalconCLIException; - - /** - * Get status of the entity. - * @param entityType Valid options are feed or process. - * @param entityName Name of the entity. - * @param colo Colo on which the query should be run. - * @param doAsUser proxy user - * @param showScheduler whether the call should return the scheduler on which the entity is scheduled. - * @return Status of the entity. - * @throws FalconCLIException - */ - public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser, - boolean showScheduler) throws FalconCLIException; - - /** - * Submits and schedules an entity. - * @param entityType Valid options are feed or process. - * @param filePath Path for the entity definition - * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. - * @param doAsUser proxy user - * @return Result of the submit and schedule command. - */ - public abstract APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, String doAsUser, - String properties) throws FalconCLIException; - - /** - * - * Get list of the entities. - * We have two filtering parameters for entity tags: "tags" and "tagkeys". - * "tags" does the exact match in key=value fashion, while "tagkeys" finds all the entities with the given key as a - * substring in the tags. This "tagkeys" filter is introduced for the user who doesn't remember the exact tag but - * some keywords in the tag. It also helps users to save the time of typing long tags. - * The returned entities will match all the filtering criteria. - * @param entityType Comma-separated entity types. Can be empty. Valid entity types are cluster, feed or process. - * @param fields <optional param> Fields of entity that the user wants to view, separated by commas. - * Valid options are STATUS, TAGS, PIPELINES, CLUSTERS. - * @param nameSubsequence <optional param> Subsequence of entity name. Not case sensitive. - * The entity name needs to contain all the characters in the subsequence in the same order. - * Example 1: "sample1" will match the entity named "SampleFeed1-2". - * Example 2: "mhs" will match the entity named "New-My-Hourly-Summary". - * @param tagKeywords <optional param> Keywords in tags, separated by comma. Not case sensitive. - * The returned entities will have tags that match all the tag keywords. - * @param filterTags <optional param> Return list of entities that have specified tags, separated by a comma. - * Query will do AND on tag values. - * Example: [email protected],[email protected] - * @param filterBy <optional param> Filter results by list of field:value pairs. - * Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs - * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER. - * Query will do an AND among filterBy fields. - * @param orderBy <optional param> Field by which results should be ordered. - * Supports ordering by "name". - * @param sortOrder <optional param> Valid options are "asc" and "desc" - * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. - * @param numResults <optional param> Number of results to show per request, used for pagination. Only - * integers > 0 are valid, Default is 10. - * @param doAsUser proxy user - * @return Total number of results and a list of entities. - */ - public abstract EntityList getEntityList(String entityType, String fields, String nameSubsequence, String - tagKeywords, String filterBy, String filterTags, String orderBy, String sortOrder, Integer offset, Integer - numResults, String doAsUser) throws FalconCLIException; - - /** - * Given an EntityType and cluster, get list of entities along with summary of N recent instances of each entity. - * @param entityType Valid options are feed or process. - * @param cluster Show entities that belong to this cluster. - * @param start <optional param> Show entity summaries from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * By default, it is set to (end - 2 days). - * @param end <optional param> Show entity summary up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * Default is set to now. - * @param fields <optional param> Fields of entity that the user wants to view, separated by commas. - * Valid options are STATUS, TAGS, PIPELINES. - * @param filterBy <optional param> Filter results by list of field:value pairs. - * Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs - * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER. - * Query will do an AND among filterBy fields. - * @param filterTags <optional param> Return list of entities that have specified tags, separated by a comma. - * Query will do AND on tag values. - * Example: [email protected],[email protected] - * @param orderBy <optional param> Field by which results should be ordered. - * Supports ordering by "name". - * @param sortOrder <optional param> Valid options are "asc" and "desc" - * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. - * @param numInstances <optional param> Number of results to show per request, used for pagination. Only - * integers > 0 are valid, Default is 10. - * @param numResults <optional param> Number of recent instances to show per entity. Only integers > 0 are - * valid, Default is 7. - * @param doAsUser proxy user - * @return Show entities along with summary of N instances for each entity. - */ - public abstract EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, - String fields, String filterBy, String filterTags, String - orderBy, String sortOrder, Integer offset, Integer - numResults, Integer numInstances, String doAsUser) throws - FalconCLIException; - - /** - * Force updates the entity. - * @param entityType Valid options are feed or process. - * @param entityName Name of the entity. - * @param colo Colo on which the query should be run. - * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true. - * @param doAsUser proxy user - * @return Result of the validation. - */ - public abstract APIResult touch(String entityType, String entityName, String colo, Boolean skipDryRun, - String doAsUser) throws FalconCLIException; - - /** - * Kill currently running instance(s) of an entity. - * @param type Valid options are feed or process. - * @param entity name of the entity. - * @param start start time of the instance(s) that you want to refer to - * @param end end time of the instance(s) that you want to refer to - * @param colo Colo on which the query should be run. - * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for - * process. - * @param doAsUser proxy user - * @return Result of the kill operation. - */ - public abstract InstancesResult killInstances(String type, String entity, String start, String end, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, - UnsupportedEncodingException; - - /** - * Suspend instances of an entity. - * @param type Valid options are feed or process. - * @param entity name of the entity. - * @param start the start time of the instance(s) that you want to refer to - * @param end the end time of the instance(s) that you want to refer to - * @param colo Colo on which the query should be run. - * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for - * process. - * @param doAsUser proxy user - * @return Results of the suspend command. - */ - public abstract InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, UnsupportedEncodingException; - - /** - * Resume suspended instances of an entity. - * @param type Valid options are feed or process. - * @param entity name of the entity. - * @param start start time of the instance(s) that you want to refer to - * @param end the end time of the instance(s) that you want to refer to - * @param colo Colo on which the query should be run. - * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for - * process. - * @param doAsUser proxy user - * @return Results of the resume command. - */ - public abstract InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, UnsupportedEncodingException; - - /** - * Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in - * the workflow. - * @param type Valid options are feed or process. - * @param entity name of the entity. - * @param start start is the start time of the instance that you want to refer to - * @param end end is the end time of the instance that you want to refer to - * @param colo Colo on which the query should be run. - * @param lifeCycles <optional param> can be Eviction/Replication(default) for feed and Execution(default) for - * process. - * @param isForced <optional param> can be used to forcefully rerun the entire instance. - * @param doAsUser proxy user - * @return Results of the rerun command. - */ - public abstract InstancesResult rerunInstances(String type, String entity, String start, String end, - String filePath, String colo, String clusters, - String sourceClusters, List<LifeCycle> lifeCycles, Boolean isForced, - String doAsUser) throws FalconCLIException, IOException; - - /** - * Get summary of instance/instances of an entity. - * @param type Valid options are cluster, feed or process. - * @param entity Name of the entity. - * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * By default, it is set to (end - (10 * entityFrequency)). - * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * Default is set to now. - * @param colo <optional param> Colo on which the query should be run. - * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process - * is Execution(default). - * @param filterBy <optional param> Filter results by list of field:value pairs. - * Example1: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster - * Example2: filterBy=Status:RUNNING,Status:KILLED - * Supported filter fields are STATUS, CLUSTER. - * Query will do an AND among filterBy fields. - * @param orderBy <optional param> Field by which results should be ordered. - * Supports ordering by "cluster". Example: orderBy=cluster - * @param sortOrder <optional param> Valid options are "asc" and "desc". Example: sortOrder=asc - * @param doAsUser proxy user - * @return Summary of the instances over the specified time range - */ - public abstract InstancesSummaryResult getSummaryOfInstances(String type, String entity, String start, String end, - String colo, List<LifeCycle> lifeCycles, - String filterBy, String orderBy, String sortOrder, - String doAsUser) throws FalconCLIException; - - /** - * Get falcon feed instance availability. - * @param type Valid options is feed. - * @param entity Name of the entity. - * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * By default, it is set to (end - (10 * entityFrequency)). - * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * Default is set to now. - * @param colo Colo on which the query should be run. - * @param doAsUser proxy user - * @return Feed instance availability status - */ - public abstract FeedInstanceResult getFeedListing(String type, String entity, String start, String end, String colo, - String doAsUser) throws FalconCLIException; - - /** - * Get log of a specific instance of an entity. - * @param type Valid options are cluster, feed or process. - * @param entity Name of the entity. - * @param start <optional param> Show instances from this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * By default, it is set to (end - (10 * entityFrequency)). - * @param end <optional param> Show instances up to this date. Date format is yyyy-MM-dd'T'HH:mm'Z'. - * Default is set to now. - * @param colo <optional param> Colo on which the query should be run. - * @param runId <optional param> Run Id. - * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is - * Execution(default). - * @param filterBy <optional param> Filter results by list of field:value pairs. - * Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster - * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER, STARTEDAFTER. - * Query will do an AND among filterBy fields. - * @param orderBy <optional param> Field by which results should be ordered. - * Supports ordering by "status","startTime","endTime","cluster". - * @param sortOrder <optional param> Valid options are "asc" and "desc" - * @param offset <optional param> Show results from the offset, used for pagination. Defaults to 0. - * @param numResults <optional param> Number of results to show per request, used for pagination. Only integers > 0 - * are valid, Default is 10. - * @param doAsUser proxy user - * @return Log of specified instance. - */ - public abstract InstancesResult getLogsOfInstances(String type, String entity, String start, String end, - String colo, String runId, List<LifeCycle> lifeCycles, - String filterBy, String orderBy, String sortOrder, - Integer offset, Integer numResults, String doAsUser) throws - FalconCLIException; - - //RESUME CHECKSTYLE CHECK ParameterNumberCheck - - /** - * Get the params passed to the workflow for an instance of feed/process. - * @param type Valid options are cluster, feed or process. - * @param entity Name of the entity. - * @param start should be the nominal time of the instance for which you want the params to be returned - * @param colo <optional param> Colo on which the query should be run. - * @param lifeCycles <optional param> Valid lifecycles for feed are Eviction/Replication(default) and for process is - * Execution(default). - * @param doAsUser proxy user - * @return List of instances currently running. - */ - public abstract InstancesResult getParamsOfInstance(String type, String entity, String start, String colo, - List<LifeCycle> lifeCycles, String doAsUser) throws - FalconCLIException, UnsupportedEncodingException; - - /** - * Get dependent instances for a particular instance. - * @param entityType Valid options are feed or process. - * @param entityName Name of the entity - * @param instanceTime <mandatory param> time of the given instance - * @param colo Colo on which the query should be run. - * @return Dependent instances for the specified instance - */ - public abstract InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, - String instanceTime, String colo) throws - FalconCLIException; - - /** - * Get version of the falcon server. - * @return Version of the server. - */ - public abstract String getVersion(String doAsUser) throws FalconCLIException; - - protected InputStream getServletInputStream(String clusters, String sourceClusters, String properties) throws - FalconCLIException, UnsupportedEncodingException { - - InputStream stream; - StringBuilder buffer = new StringBuilder(); - if (clusters != null) { - buffer.append(FALCON_INSTANCE_ACTION_CLUSTERS).append('=').append(clusters).append('\n'); - } - if (sourceClusters != null) { - buffer.append(FALCON_INSTANCE_SOURCE_CLUSTERS).append('=').append(sourceClusters).append('\n'); - } - if (properties != null) { - buffer.append(properties); - } - stream = new ByteArrayInputStream(buffer.toString().getBytes()); - return (buffer.length() == 0) ? null : stream; - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/client/src/main/java/org/apache/falcon/client/FalconCLIException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java deleted file mode 100644 index 29efbae..0000000 --- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.client; - -import com.sun.jersey.api.client.ClientResponse; -import org.apache.falcon.resource.APIResult; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Exception thrown by FalconClient. - */ -public class FalconCLIException extends Exception { - - private static final int MB = 1024 * 1024; - - public FalconCLIException(String msg) { - super(msg); - } - - public FalconCLIException(Throwable e) { - super(e); - } - - public FalconCLIException(String msg, Throwable throwable) { - super(msg, throwable); - } - - public static FalconCLIException fromReponse(ClientResponse clientResponse) { - ClientResponse.Status status = clientResponse.getClientResponseStatus(); - String statusValue = status.toString(); - String message = ""; - if (status == ClientResponse.Status.BAD_REQUEST) { - clientResponse.bufferEntity(); - InputStream in = clientResponse.getEntityInputStream(); - try { - in.mark(MB); - message = clientResponse.getEntity(APIResult.class).getMessage(); - } catch (Throwable th) { - byte[] data = new byte[MB]; - try { - in.reset(); - int len = in.read(data); - message = new String(data, 0, len); - } catch (IOException e) { - message = e.getMessage(); - } - } - } - return new FalconCLIException(statusValue + ";" + message); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java deleted file mode 100644 index 597f608..0000000 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ /dev/null @@ -1,1057 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.client; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Method; -import java.net.URL; -import java.security.SecureRandom; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.net.util.TrustManagerUtils; -import org.apache.falcon.LifeCycle; -import org.apache.falcon.cli.FalconCLI; -import org.apache.falcon.cli.FalconMetadataCLI; -import org.apache.falcon.entity.v0.DateValidator; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.metadata.RelationshipType; -import org.apache.falcon.recipe.RecipeTool; -import org.apache.falcon.recipe.RecipeToolArgs; -import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.EntityList; -import org.apache.falcon.resource.EntitySummaryResult; -import org.apache.falcon.resource.FeedInstanceResult; -import org.apache.falcon.resource.FeedLookupResult; -import org.apache.falcon.resource.InstanceDependencyResult; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.falcon.resource.LineageGraphResult; -import org.apache.falcon.resource.SchedulableEntityInstanceResult; -import org.apache.falcon.resource.TriageResult; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; -import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HTTPSProperties; - -/** - * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs - * against an Falcon instance. - */ -public class FalconClient extends AbstractFalconClient { - - public static final AtomicReference<PrintStream> OUT = new AtomicReference<>(System.out); - - public static final String WS_HEADER_PREFIX = "header:"; - public static final String USER = System.getProperty("user.name"); - public static final String AUTH_URL = "api/options?" + PseudoAuthenticator.USER_NAME + "=" + USER; - - - - public static final String PATH = "path"; - public static final String COLO = "colo"; - private static final String KEY = "key"; - private static final String VALUE = "value"; - public static final String CLUSTER = "cluster"; - public static final String RUN_ID = "runid"; - public static final String FORCE = "force"; - public static final String SHOW_SCHEDULER = "showScheduler"; - public static final String ENTITY_NAME = "name"; - public static final String SKIP_DRYRUN = "skipDryRun"; - public static final String FILTER_BY = "filterBy"; - public static final String ORDER_BY = "orderBy"; - public static final String SORT_ORDER = "sortOrder"; - public static final String OFFSET = "offset"; - public static final String NUM_RESULTS = "numResults"; - public static final String START = "start"; - public static final String END = "end"; - public static final String INSTANCE_TIME = "instanceTime"; - public static final String PROPERTIES = "properties"; - private static final String FIELDS = "fields"; - private static final String NAME_SUBSEQUENCE = "nameseq"; - private static final String FILTER_TAGS = "tags"; - private static final String TAG_KEYWORDS = "tagkeys"; - private static final String LIFECYCLE = "lifecycle"; - private static final String NUM_INSTANCES = "numInstances"; - public static final String ALL_ATTEMPTS = "allAttempts"; - - - - - public static final String DO_AS_OPT = "doAs"; - /** - * Name of the HTTP cookie used for the authentication token between the client and the server. - */ - public static final String AUTH_COOKIE = "hadoop.auth"; - private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "="; - - private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator(); - private static final String TEMPLATE_SUFFIX = "-template.xml"; - - - private static final String PROPERTIES_SUFFIX = ".properties"; - public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession sslSession) { - return true; - } - }; - private final WebResource service; - private final AuthenticatedURL.Token authenticationToken; - - /** - * debugMode=false means no debugging. debugMode=true means debugging on. - */ - private boolean debugMode = false; - - private final Properties clientProperties; - - /** - * Create a Falcon client instance. - * - * @param falconUrl of the server to which client interacts - * @throws FalconCLIException - If unable to initialize SSL Props - */ - public FalconClient(String falconUrl) throws FalconCLIException { - this(falconUrl, new Properties()); - } - - /** - * Create a Falcon client instance. - * - * @param falconUrl of the server to which client interacts - * @param properties client properties - * @throws FalconCLIException - If unable to initialize SSL Props - */ - public FalconClient(String falconUrl, Properties properties) throws FalconCLIException { - try { - String baseUrl = notEmpty(falconUrl, "FalconUrl"); - if (!baseUrl.endsWith("/")) { - baseUrl += "/"; - } - this.clientProperties = properties; - SSLContext sslContext = getSslContext(); - DefaultClientConfig config = new DefaultClientConfig(); - config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, - new HTTPSProperties(ALL_TRUSTING_HOSTNAME_VERIFIER, sslContext) - ); - Client client = Client.create(config); - client.setConnectTimeout(Integer.parseInt(clientProperties.getProperty("falcon.connect.timeout", - "180000"))); - client.setReadTimeout(Integer.parseInt(clientProperties.getProperty("falcon.read.timeout", "180000"))); - service = client.resource(UriBuilder.fromUri(baseUrl).build()); - client.resource(UriBuilder.fromUri(baseUrl).build()); - authenticationToken = getToken(baseUrl); - } catch (Exception e) { - throw new FalconCLIException("Unable to initialize Falcon Client object. Cause : " + e.getMessage(), e); - } - } - - private static SSLContext getSslContext() throws Exception { - SSLContext sslContext = SSLContext.getInstance("SSL"); - sslContext.init( - null, - new TrustManager[]{TrustManagerUtils.getValidateServerCertificateTrustManager()}, - new SecureRandom()); - return sslContext; - } - - /** - * @return current debug Mode - */ - public boolean getDebugMode() { - return debugMode; - } - - /** - * Set debug mode. - * - * @param debugMode : debugMode=false means no debugging. debugMode=true means debugging on - */ - public void setDebugMode(boolean debugMode) { - this.debugMode = debugMode; - } - - public static AuthenticatedURL.Token getToken(String baseUrl) throws FalconCLIException { - AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token(); - try { - URL url = new URL(baseUrl + AUTH_URL); - // using KerberosAuthenticator which falls back to PsuedoAuthenticator - // instead of passing authentication type from the command line - bad factory - HttpsURLConnection.setDefaultSSLSocketFactory(getSslContext().getSocketFactory()); - HttpsURLConnection.setDefaultHostnameVerifier(ALL_TRUSTING_HOSTNAME_VERIFIER); - new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken); - } catch (Exception ex) { - throw new FalconCLIException("Could not authenticate, " + ex.getMessage(), ex); - } - - return currentToken; - } - - - /** - * Methods allowed on Entity Resources. - */ - protected static enum Entities { - VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML), - SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML), - UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML), - SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML), - SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML), - SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML), - RESUME("api/entities/resume/", HttpMethod.POST, MediaType.TEXT_XML), - DELETE("api/entities/delete/", HttpMethod.DELETE, MediaType.TEXT_XML), - STATUS("api/entities/status/", HttpMethod.GET, MediaType.TEXT_XML), - DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML), - LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML), - SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON), - LOOKUP("api/entities/lookup/", HttpMethod.GET, MediaType.APPLICATION_JSON), - DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML), - SLA("api/entities/sla-alert", HttpMethod.GET, MediaType.APPLICATION_JSON), - TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML); - - private String path; - private String method; - private String mimeType; - - Entities(String path, String method, String mimeType) { - this.path = path; - this.method = method; - this.mimeType = mimeType; - } - } - - /** - * Methods allowed on Metadata Discovery Resources. - */ - protected static enum MetadataOperations { - - LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON), - RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON), - VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON), - EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON), - LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON); - - private String path; - private String method; - private String mimeType; - - MetadataOperations(String path, String method, String mimeType) { - this.path = path; - this.method = method; - this.mimeType = mimeType; - } - } - - /** - * Methods allowed on Process Instance Resources. - */ - protected static enum Instances { - RUNNING("api/instance/running/", HttpMethod.GET, MediaType.APPLICATION_JSON), - STATUS("api/instance/status/", HttpMethod.GET, MediaType.APPLICATION_JSON), - LIST("api/instance/list", HttpMethod.GET, MediaType.APPLICATION_JSON), - KILL("api/instance/kill/", HttpMethod.POST, MediaType.APPLICATION_JSON), - SUSPEND("api/instance/suspend/", HttpMethod.POST, MediaType.APPLICATION_JSON), - RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON), - RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON), - LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON), - SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON), - PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON), - DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON), - TRIAGE("api/instance/triage/", HttpMethod.GET, MediaType.APPLICATION_JSON), - LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON); - - private String path; - private String method; - private String mimeType; - - Instances(String path, String method, String mimeType) { - this.path = path; - this.method = method; - this.mimeType = mimeType; - } - } - - protected static enum AdminOperations { - - STACK("api/admin/stack", HttpMethod.GET, MediaType.TEXT_PLAIN), - VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON); - - private String path; - private String method; - private String mimeType; - - AdminOperations(String path, String method, String mimeType) { - this.path = path; - this.method = method; - this.mimeType = mimeType; - } - } - - public String notEmpty(String str, String name) { - if (str == null) { - - throw new IllegalArgumentException(name + " cannot be null"); - } - if (str.length() == 0) { - throw new IllegalArgumentException(name + " cannot be empty"); - } - return str; - } - - public APIResult schedule(EntityType entityType, String entityName, String colo, - Boolean skipDryRun, String doAsUser, String properties) throws FalconCLIException { - String type = entityType.toString().toLowerCase(); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SCHEDULE.path, type, entityName) - .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun) - .addQueryParam(PROPERTIES, properties).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SCHEDULE); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) - throws FalconCLIException { - String type = entityType.toString().toLowerCase(); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUSPEND.path, type, entityName) - .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUSPEND); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) - throws FalconCLIException { - String type = entityType.toString().toLowerCase(); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.RESUME.path, type, entityName) - .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).call(Entities.RESUME); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult delete(EntityType entityType, String entityName, String doAsUser) throws FalconCLIException { - String type = entityType.toString().toLowerCase(); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.DELETE.path, type, entityName) - .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DELETE); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult validate(String entityType, String filePath, Boolean skipDryRun, String doAsUser) - throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.VALIDATE.path, entityType) - .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) - .call(Entities.VALIDATE, entityStream); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult submit(String entityType, String filePath, String doAsUser) - throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMIT.path, entityType) - .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.SUBMIT, entityStream); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult update(String entityType, String entityName, String filePath, - Boolean skipDryRun, String doAsUser) throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - Entities operation = Entities.UPDATE; - ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName) - .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) - .call(operation, entityStream); - return getResponse(APIResult.class, clientResponse); - } - - @Override - public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun, - String doAsUser, String properties) throws FalconCLIException { - InputStream entityStream = getServletInputStream(filePath); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUBMITANDSCHEDULE.path, entityType) - .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(PROPERTIES, properties).call(Entities.SUBMITANDSCHEDULE, entityStream); - return getResponse(APIResult.class, clientResponse); - } - - public APIResult getStatus(EntityType entityType, String entityName, String colo, - String doAsUser, boolean showScheduler) throws FalconCLIException { - String type = entityType.toString().toLowerCase(); - ClientResponse clientResponse = new ResourceBuilder().path(Entities.STATUS.path, type, entityName) - .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(SHOW_SCHEDULER, showScheduler).call(Entities.STATUS); - - return getResponse(APIResult.class, clientResponse); - } - - public Entity getDefinition(String entityType, String entityName, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEFINITION.path, entityType, entityName) - .call(Entities.DEFINITION); - String entity = getResponseAsString(clientResponse); - return Entity.fromString(EntityType.getEnum(entityType), entity); - } - - public EntityList getDependency(String entityType, String entityName, String doAsUser) - throws FalconCLIException { - - ClientResponse clientResponse = new ResourceBuilder().path(Entities.DEPENDENCY.path, entityType, entityName) - .addQueryParam(DO_AS_OPT, doAsUser).call(Entities.DEPENDENCY); - - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - - EntityList result = clientResponse.getEntity(EntityList.class); - if (result == null || result.getElements() == null) { - return null; - } - return result; - } - - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - - public SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName, - String startTime, String endTime, String colo) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SLA.path, entityType) - .addQueryParam(START, startTime).addQueryParam(COLO, colo).addQueryParam(END, endTime) - .addQueryParam(ENTITY_NAME, entityName).call(Entities.SLA); - return getResponse(SchedulableEntityInstanceResult.class, clientResponse); - } - - public TriageResult triage(String entityType, String entityName, String instanceTime, - String colo) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.TRIAGE.path, entityType, entityName) - .addQueryParam(START, instanceTime).addQueryParam(COLO, colo).call(Instances.TRIAGE); - return getResponse(TriageResult.class, clientResponse); - } - - @Override - public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords, - String filterBy, String filterTags, String orderBy, String sortOrder, - Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - Entities operation = Entities.LIST; - ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType) - .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(NUM_RESULTS, numResults) - .addQueryParam(OFFSET, offset).addQueryParam(SORT_ORDER, sortOrder) - .addQueryParam(ORDER_BY, orderBy).addQueryParam(FILTER_BY, filterBy) - .addQueryParam(FIELDS, fields).addQueryParam(NAME_SUBSEQUENCE, nameSubsequence) - .addQueryParam(TAG_KEYWORDS, tagKeywords).addQueryParam(FILTER_TAGS, filterTags) - .call(operation); - - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - - EntityList result = clientResponse.getEntity(EntityList.class); - if (result == null || result.getElements() == null) { - return null; - } - return result; - } - - @Override - public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end, - String fields, String filterBy, String filterTags, - String orderBy, String sortOrder, Integer offset, Integer numResults, - Integer numInstances, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Entities.SUMMARY.path, entityType) - .addQueryParam(CLUSTER, cluster).addQueryParam(START, start).addQueryParam(END, end) - .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(ORDER_BY, orderBy) - .addQueryParam(OFFSET, offset).addQueryParam(NUM_RESULTS, numResults) - .addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(FILTER_BY, filterBy) - .addQueryParam(NUM_INSTANCES, numInstances).addQueryParam(FIELDS, fields) - .addQueryParam(FILTER_TAGS, filterTags).call(Entities.SUMMARY); - return getResponse(EntitySummaryResult.class, clientResponse); - } - - @Override - public APIResult touch(String entityType, String entityName, String colo, - Boolean skipDryRun, String doAsUser) throws FalconCLIException { - Entities operation = Entities.TOUCH; - ClientResponse clientResponse = new ResourceBuilder().path(operation.path, entityType, entityName) - .addQueryParam(COLO, colo).addQueryParam(SKIP_DRYRUN, skipDryRun) - .addQueryParam(DO_AS_OPT, doAsUser).call(operation); - return getResponse(APIResult.class, clientResponse); - } - - public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles, - String filterBy, String orderBy, String sortOrder, - Integer offset, Integer numResults, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.RUNNING.path, type, entity) - .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy) - .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset) - .addQueryParam(NUM_RESULTS, numResults).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RUNNING); - return getResponse(InstancesResult.class, clientResponse); - } - - @Override - public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, String colo, - List<LifeCycle> lifeCycles, String filterBy, String orderBy, - String sortOrder, Integer offset, Integer numResults, - String doAsUser, Boolean allAttempts) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.STATUS.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FILTER_BY, filterBy) - .addQueryParam(ORDER_BY, orderBy).addQueryParam(SORT_ORDER, sortOrder) - .addQueryParam(OFFSET, offset).addQueryParam(NUM_RESULTS, numResults) - .addQueryParam(ALL_ATTEMPTS, allAttempts).addQueryParam(USER, doAsUser).call(Instances.STATUS); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesSummaryResult getSummaryOfInstances(String type, String entity, - String start, String end, - String colo, List<LifeCycle> lifeCycles, - String filterBy, String orderBy, String sortOrder, - String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUMMARY.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUMMARY); - return getResponse(InstancesSummaryResult.class, clientResponse); - } - - public FeedInstanceResult getFeedListing(String type, String entity, String start, - String end, String colo, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(USER, doAsUser).call(Instances.LISTING); - return getResponse(FeedInstanceResult.class, clientResponse); - } - - public InstancesResult killInstances(String type, String entity, String start, - String end, String colo, String clusters, - String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, UnsupportedEncodingException { - InputStream props = getServletInputStream(clusters, sourceClusters, null); - ClientResponse clientResponse = new ResourceBuilder().path(Instances.KILL.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.KILL, props); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesResult suspendInstances(String type, String entity, String start, String end, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, UnsupportedEncodingException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.SUSPEND.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.SUSPEND); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesResult resumeInstances(String type, String entity, String start, String end, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - String doAsUser) throws FalconCLIException, UnsupportedEncodingException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.RESUME.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(USER, doAsUser).call(Instances.RESUME); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesResult rerunInstances(String type, String entity, String start, - String end, String filePath, String colo, - String clusters, String sourceClusters, List<LifeCycle> lifeCycles, - Boolean isForced, String doAsUser) - throws FalconCLIException, IOException { - - StringBuilder buffer = new StringBuilder(); - if (filePath != null) { - BufferedReader in = null; - try { - in = new BufferedReader(new FileReader(filePath)); - - String str; - while ((str = in.readLine()) != null) { - buffer.append(str).append("\n"); - } - } finally { - IOUtils.closeQuietly(in); - } - } - String temp = (buffer.length() == 0) ? null : buffer.toString(); - InputStream props = getServletInputStream(clusters, sourceClusters, temp); - ClientResponse clientResponse = new ResourceBuilder().path(Instances.RERUN.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(LIFECYCLE, lifeCycles, type).addQueryParam(FORCE, isForced) - .addQueryParam(USER, doAsUser).call(Instances.RERUN, props); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesResult getLogsOfInstances(String type, String entity, String start, - String end, String colo, String runId, - List<LifeCycle> lifeCycles, String filterBy, - String orderBy, String sortOrder, Integer offset, - Integer numResults, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(Instances.LOG.path, type, entity) - .addQueryParam(START, start).addQueryParam(END, end).addQueryParam(COLO, colo) - .addQueryParam(RUN_ID, runId).addQueryParam(LIFECYCLE, lifeCycles, type) - .addQueryParam(FILTER_BY, filterBy).addQueryParam(ORDER_BY, orderBy) - .addQueryParam(SORT_ORDER, sortOrder).addQueryParam(OFFSET, offset) - .addQueryParam(NUM_RESULTS, numResults).addQueryParam(USER, doAsUser).call(Instances.LOG); - return getResponse(InstancesResult.class, clientResponse); - } - - public InstancesResult getParamsOfInstance(String type, String entity, - String start, String colo, - List<LifeCycle> lifeCycles, - String doAsUser) - throws FalconCLIException, UnsupportedEncodingException { - if (!DateValidator.validate(start)) { - throw new FalconCLIException("Start date is mandatory and should be" - + " a valid date in YYYY-MM-DDTHH:MMZ format."); - } - - ClientResponse clientResponse = new ResourceBuilder().path(Instances.PARAMS.path, type, entity) - .addQueryParam(START, start).addQueryParam(LIFECYCLE, lifeCycles, type) - .addQueryParam(USER, doAsUser).call(Instances.PARAMS); - return getResponse(InstancesResult.class, clientResponse); - } - - public String getThreadDump(String doAsUser) throws FalconCLIException { - return sendAdminRequest(AdminOperations.STACK, doAsUser); - } - - @Override - public String getVersion(String doAsUser) throws FalconCLIException { - return sendAdminRequest(AdminOperations.VERSION, doAsUser); - } - - public int getStatus(String doAsUser) throws FalconCLIException { - AdminOperations job = AdminOperations.VERSION; - ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) - .call(job); - printClientResponse(clientResponse); - return clientResponse.getStatus(); - } - - public String getDimensionList(String dimensionType, String cluster, String doAsUser) throws FalconCLIException { - return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser); - } - - public String getReplicationMetricsDimensionList(String schedEntityType, String schedEntityName, - Integer numResults, String doAsUser) throws FalconCLIException { - return sendRequestForReplicationMetrics(MetadataOperations.LIST, - schedEntityType, schedEntityName, numResults, doAsUser); - } - - public LineageGraphResult getEntityLineageGraph(String pipelineName, String doAsUser) throws FalconCLIException { - MetadataOperations operation = MetadataOperations.LINEAGE; - ClientResponse clientResponse = new ResourceBuilder().path(operation.path).addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName).call(operation); - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(LineageGraphResult.class); - } - - public String getDimensionRelations(String dimensionType, String dimensionName, - String doAsUser) throws FalconCLIException { - return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null, doAsUser); - } - - /** - * Converts a InputStream into ServletInputStream. - * - * @param filePath - Path of file to stream - * @return ServletInputStream - * @throws FalconCLIException - */ - private InputStream getServletInputStream(String filePath) - throws FalconCLIException { - - if (filePath == null) { - return null; - } - InputStream stream; - try { - stream = new FileInputStream(filePath); - } catch (FileNotFoundException e) { - throw new FalconCLIException("File not found:", e); - } - return stream; - } - - private <T extends APIResult> T getResponse(Class<T> clazz, - ClientResponse clientResponse) throws FalconCLIException { - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(clazz); - } - - private String getResponseAsString(ClientResponse clientResponse) throws FalconCLIException { - printClientResponse(clientResponse); - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(String.class); - } - - private class ResourceBuilder { - WebResource resource; - - private ResourceBuilder path(String... paths) { - for (String path : paths) { - if (resource == null) { - resource = service.path(path); - } else { - resource = resource.path(path); - } - } - return this; - } - - public ResourceBuilder addQueryParam(String paramName, Integer value) { - if (value != null) { - resource = resource.queryParam(paramName, value.toString()); - } - return this; - } - - public ResourceBuilder addQueryParam(String paramName, Boolean paramValue) { - if (paramValue != null) { - resource = resource.queryParam(paramName, String.valueOf(paramValue)); - } - return this; - } - - public ResourceBuilder addQueryParam(String paramName, String paramValue) { - if (StringUtils.isNotBlank(paramValue)) { - resource = resource.queryParam(paramName, paramValue); - } - return this; - } - - public ResourceBuilder addQueryParam(String paramName, List<LifeCycle> lifeCycles, - String type) throws FalconCLIException { - if (lifeCycles != null) { - checkLifeCycleOption(lifeCycles, type); - for (LifeCycle lifeCycle : lifeCycles) { - resource = resource.queryParam(paramName, lifeCycle.toString()); - } - } - return this; - } - - private ClientResponse call(Entities entities) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(entities.mimeType).type(MediaType.TEXT_XML) - .method(entities.method, ClientResponse.class); - } - - public ClientResponse call(AdminOperations operation) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class); - } - - private ClientResponse call(MetadataOperations metadataOperations) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(metadataOperations.mimeType).type(MediaType.TEXT_XML) - .method(metadataOperations.method, ClientResponse.class); - } - - public ClientResponse call(Instances operation) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class); - } - - public ClientResponse call(Entities operation, InputStream entityStream) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class, entityStream); - } - - public ClientResponse call(Instances operation, InputStream entityStream) { - return resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(MediaType.TEXT_XML) - .method(operation.method, ClientResponse.class, entityStream); - } - } - - public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException { - Entities api = Entities.LOOKUP; - ClientResponse response = new ResourceBuilder().path(api.path, type).addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(PATH, path).call(api); - return getResponse(FeedLookupResult.class, response); - } - - public FeedInstanceResult getFeedInstanceListing(String type, String entity, String start, String end, String colo - , String doAsUser) throws FalconCLIException { - - checkType(type); - Instances api = Instances.LISTING; - ClientResponse clientResponse = new ResourceBuilder().path(api.path, type, entity) - .addQueryParam(COLO, colo).addQueryParam(DO_AS_OPT, doAsUser).addQueryParam(START, start) - .addQueryParam(END, end).call(api); - return getResponse(FeedInstanceResult.class, clientResponse); - } - - - public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime, - String colo) throws FalconCLIException { - checkType(entityType); - Instances api = Instances.DEPENDENCY; - ClientResponse clientResponse = new ResourceBuilder().path(api.path, entityType, entityName) - .addQueryParam(COLO, colo).addQueryParam(INSTANCE_TIME, instanceTime).call(api); - return getResponse(InstanceDependencyResult.class, clientResponse); - } - - //RESUME CHECKSTYLE CHECK VisibilityModifierCheck - - private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException { - if (lifeCycles != null && !lifeCycles.isEmpty()) { - EntityType entityType = EntityType.getEnum(type); - for (LifeCycle lifeCycle : lifeCycles) { - if (entityType != lifeCycle.getTag().getType()) { - throw new FalconCLIException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type); - } - } - } - } - - protected void checkType(String type) throws FalconCLIException { - if (type == null || type.isEmpty()) { - throw new FalconCLIException("entity type is empty"); - } else { - EntityType entityType = EntityType.getEnum(type); - if (entityType == EntityType.CLUSTER) { - throw new FalconCLIException( - "Instance management functions don't apply to Cluster entities"); - } - } - } - - private String sendAdminRequest(AdminOperations job, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) - .call(job); - return getResponseAsString(clientResponse); - } - - private String sendRequestForReplicationMetrics(final MetadataOperations operation, final String schedEntityType, - final String schedEntityName, Integer numResults, - final String doAsUser) throws FalconCLIException { - WebResource resource = service.path(operation.path) - .path(schedEntityName) - .path(RelationshipType.REPLICATION_METRICS.getName()) - .path(FalconMetadataCLI.LIST_OPT); - - if (StringUtils.isNotEmpty(schedEntityName)) { - resource = resource.queryParam(FalconCLI.TYPE_OPT, schedEntityType); - } - - if (numResults != null) { - resource = resource.queryParam(FalconCLI.NUM_RESULTS_OPT, numResults.toString()); - } - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(operation.mimeType) - .method(operation.method, ClientResponse.class); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(String.class); - - } - - private String sendMetadataDiscoveryRequest(final MetadataOperations operation, - final String dimensionType, - final String dimensionName, - final String cluster, - final String doAsUser) throws FalconCLIException { - WebResource resource; - switch (operation) { - case LIST: - resource = service.path(operation.path) - .path(dimensionType) - .path(FalconMetadataCLI.LIST_OPT); - break; - - case RELATIONS: - resource = service.path(operation.path) - .path(dimensionType) - .path(dimensionName) - .path(FalconMetadataCLI.RELATIONS_OPT); - break; - - default: - throw new FalconCLIException("Invalid Metadata client Operation " + operation.toString()); - } - - if (!StringUtils.isEmpty(cluster)) { - resource = resource.queryParam(FalconMetadataCLI.CLUSTER_OPT, cluster); - } - - if (StringUtils.isNotEmpty(doAsUser)) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser); - } - - ClientResponse clientResponse = resource - .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) - .accept(operation.mimeType).type(operation.mimeType) - .method(operation.method, ClientResponse.class); - - printClientResponse(clientResponse); - - checkIfSuccessful(clientResponse); - return clientResponse.getEntity(String.class); - } - - - public String getVertex(String id, String doAsUser) throws FalconCLIException { - return sendMetadataLineageRequest(MetadataOperations.VERTICES, id, doAsUser); - } - - public String getVertices(String key, String value, String doAsUser) throws FalconCLIException { - return sendMetadataLineageRequest(MetadataOperations.VERTICES, key, value, doAsUser); - } - - public String getVertexEdges(String id, String direction, String doAsUser) throws FalconCLIException { - return sendMetadataLineageRequestForEdges(MetadataOperations.VERTICES, id, direction, doAsUser); - } - - public String getEdge(String id, String doAsUser) throws FalconCLIException { - return sendMetadataLineageRequest(MetadataOperations.EDGES, id, doAsUser); - } - - private String getRecipePath(String recipePropertiesFile) throws FalconCLIException { - String recipePath = null; - if (StringUtils.isNotBlank(recipePropertiesFile)) { - File file = new File(recipePropertiesFile); - if (file.exists()) { - recipePath = file.getAbsoluteFile().getParentFile().getAbsolutePath(); - } - } else { - recipePath = clientProperties.getProperty("falcon.recipe.path"); - } - - return recipePath; - } - - public APIResult submitRecipe(String recipeName, String recipeToolClassName, - final String recipeOperation, String recipePropertiesFile, Boolean skipDryRun, - final String doAsUser) throws FalconCLIException { - String recipePath = getRecipePath(recipePropertiesFile); - - if (StringUtils.isEmpty(recipePath)) { - throw new FalconCLIException("falcon.recipe.path is not set in client.properties or properties " - + " file is not provided"); - } - - String templateFilePath = recipePath + File.separator + recipeName + TEMPLATE_SUFFIX; - File file = new File(templateFilePath); - if (!file.exists()) { - throw new FalconCLIException("Recipe template file does not exist : " + templateFilePath); - } - - String propertiesFilePath = recipePath + File.separator + recipeName + PROPERTIES_SUFFIX; - file = new File(propertiesFilePath); - if (!file.exists()) { - throw new FalconCLIException("Recipe properties file does not exist : " + propertiesFilePath); - } - - String processFile; - try { - String prefix = "falcon-recipe" + "-" + System.currentTimeMillis(); - File tmpPath = new File("/tmp"); - if (!tmpPath.exists()) { - if (!tmpPath.mkdir()) { - throw new FalconCLIException("Creating directory failed: " + tmpPath.getAbsolutePath()); - } - } - File f = File.createTempFile(prefix, ".xml", tmpPath); - f.deleteOnExit(); - - processFile = f.getAbsolutePath(); - String[] args = { - "-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), templateFilePath, - "-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), propertiesFilePath, - "-" + RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile, - "-" + RecipeToolArgs.RECIPE_OPERATION_ARG.getName(), recipeOperation, - }; - - if (recipeToolClassName != null) { - Class<?> clz = Class.forName(recipeToolClassName); - Method method = clz.getMethod("main", String[].class); - method.invoke(null, args); - } else { - RecipeTool.main(args); - } - validate(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser); - return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser, null); - } catch (Exception e) { - throw new FalconCLIException(e.getMessage(), e); - } - } - - private String sendMetadataLineageRequest(MetadataOperations job, String id, - String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(job.path, id).addQueryParam(DO_AS_OPT, doAsUser) - .call(job); - return getResponseAsString(clientResponse); - } - - private String sendMetadataLineageRequest(MetadataOperations job, String key, - String value, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(job.path).addQueryParam(DO_AS_OPT, doAsUser) - .addQueryParam(KEY, key).addQueryParam(VALUE, value).call(job); - return getResponseAsString(clientResponse); - } - - private String sendMetadataLineageRequestForEdges(MetadataOperations job, String id, - String direction, String doAsUser) throws FalconCLIException { - ClientResponse clientResponse = new ResourceBuilder().path(job.path, id, direction) - .addQueryParam(DO_AS_OPT, doAsUser).call(job); - return getResponseAsString(clientResponse); - } - - private void checkIfSuccessful(ClientResponse clientResponse) throws FalconCLIException { - Response.Status.Family statusFamily = clientResponse.getClientResponseStatus().getFamily(); - if (statusFamily != Response.Status.Family.SUCCESSFUL && statusFamily != Response.Status.Family.INFORMATIONAL) { - throw FalconCLIException.fromReponse(clientResponse); - } - } - - private void printClientResponse(ClientResponse clientResponse) { - if (getDebugMode()) { - OUT.get().println(clientResponse.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java b/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java deleted file mode 100644 index 89ce6f9..0000000 --- a/client/src/main/java/org/apache/falcon/entity/v0/AccessControlList.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.v0; - -/** - * Access control list for an Entity. - */ -public abstract class AccessControlList { - - public abstract String getOwner(); - - public abstract String getGroup(); - - public abstract String getPermission(); - - @Override - public String toString() { - return "AccessControlList{" - + "owner='" + getOwner() + '\'' - + ", group='" + getGroup() + '\'' - + ", permission='" + getPermission() + '\'' - + '}'; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/client/src/main/java/org/apache/falcon/entity/v0/DateValidator.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/DateValidator.java b/client/src/main/java/org/apache/falcon/entity/v0/DateValidator.java deleted file mode 100644 index e211f57..0000000 --- a/client/src/main/java/org/apache/falcon/entity/v0/DateValidator.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity.v0; - -import org.apache.commons.lang3.StringUtils; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Date utility class. - */ -public final class DateValidator { - - private static final String DATE_PATTERN = - "(2\\d\\d\\d|19\\d\\d)-(0[1-9]|1[012])-(0[1-9]|1[0-9]|2[0-9]|3[01])T([0-1][0-9]|2[0-3]):([0-5][0-9])Z"; - private static final Pattern PATTERN = Pattern.compile(DATE_PATTERN); - - private DateValidator() { - } - - /** - * Validate date format with regular expression. - * - * @param date date address for validation - * @return true valid date fromat, false invalid date format - */ - public static boolean validate(final String date) { - if (StringUtils.isBlank(date)) { - return false; - } - Matcher matcher = PATTERN.matcher(date); - - if (matcher.matches()) { - - matcher.reset(); - - if (matcher.find()) { - - int year = Integer.parseInt(matcher.group(1)); - String month = matcher.group(2); - String day = matcher.group(3); - - if (day.equals("31") - && (month.equals("4") || month.equals("6") - || month.equals("9") || month.equals("11") - || month.equals("04") || month.equals("06") || month.equals("09"))) { - return false; // only 1,3,5,7,8,10,12 has 31 days - } else if (month.equals("2") || month.equals("02")) { - // leap year - if (year % 4 == 0) { - return !(day.equals("30") || day.equals("31")); - } else { - return !(day.equals("29") || day.equals("30") || day.equals("31")); - } - } else { - return true; - } - } else { - return false; - } - } else { - return false; - } - } -}
