Repository: falcon Updated Branches: refs/heads/master ffda98cc1 -> 617d5ab94
FALCON-2190 server side checks to not let any entity operations on entities generated by extensions Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #316 from sandeepSamudrala/FALCON-2190 and squashes the following commits: 57170c4 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 acf2734 [sandeep] FALCON-2190 Incorporated review comments d343ea3 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 07c7dd7 [sandeep] FALCON-2190 Incorporated review comments ca6fbb1 [sandeep] FALCON-2190 Incorporated review comments 94c93c4 [sandeep] FALCON-2190 Incorporated review comments 7873b05 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 e7c438a [sandeep] FALCON-2190 Fixed tags issue with extension being null for usual entities 17a15c8 [sandeep] FALCON-2190 Fixed check style issues 4c28d1c [sandeep] FALCON-2190 server side checks to not let any entity operations on entities generated by extensions f701317 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 fd2357b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 8aacd75 [sandeep] FALCON-2183 Incorporated review comments f3d7268 [sandeep] FALCON-2183 Incorporated review comments 11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/617d5ab9 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/617d5ab9 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/617d5ab9 Branch: refs/heads/master Commit: 617d5ab9492cb8c8a07fda0411a9a7826cff091c Parents: ffda98c Author: sandeep <[email protected]> Authored: Thu Dec 15 10:10:03 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Dec 15 10:10:03 2016 +0530 ---------------------------------------------------------------------- .../falcon/client/AbstractFalconClient.java | 2 +- .../apache/falcon/entity/CatalogStorage.java | 2 +- .../extensions/jdbc/ExtensionMetaStore.java | 17 +++++ .../resource/extensions/ExtensionManager.java | 4 +- .../proxy/SchedulableEntityManagerProxy.java | 68 ++++++++++++++++---- 5 files changed, 78 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index fc6bc14..3dabf52 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -521,7 +521,7 @@ public abstract class AbstractFalconClient { try { stream = new FileInputStream(filePath); } catch (FileNotFoundException e) { - throw new FalconCLIException("File not found:", e); + throw new FalconCLIException("File not found:" + filePath, e); } return stream; } http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index 31feee8..4633796 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -57,7 +57,7 @@ import java.util.regex.Matcher; */ public class CatalogStorage extends Configured implements Storage { - private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class); + private static final Logger LOG = LoggerFactory.getLogger(CatalogStorage.class); // constants to be used while preparing HCatalog partition filter query private static final String FILTER_ST_BRACKET = "("; http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 882582f..4250e15 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -75,6 +75,23 @@ public class ExtensionMetaStore { return false; } + public Boolean checkIfExtensionJobExists(String jobName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION_JOB); + q.setParameter(JOB_NAME, jobName); + int resultSize = 0; + try { + resultSize = q.getResultList().size(); + } finally { + commitAndCloseTransaction(entityManager); + } + if (resultSize > 0){ + return true; + } + return false; + } + public List<ExtensionBean> getAllExtensions() { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 9a7daa5..7c30c83 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -331,7 +331,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { LOG.error("Error while submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } - return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName); + return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully:" + jobName); } private Map<EntityType, List<Entity>> getEntityList(String extensionName, String jobName, @@ -725,7 +725,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return groupedEntities; } - private String getJobNameFromTag(String tags) { + public static String getJobNameFromTag(String tags) { int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB); if (nameStart == -1) { return null; http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 07334d6..316567e 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -27,6 +27,8 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; +import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; import org.apache.falcon.resource.APIResult; @@ -37,6 +39,7 @@ import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; +import org.apache.falcon.resource.extensions.ExtensionManager; import org.apache.falcon.util.DeploymentUtil; import javax.servlet.http.HttpServletRequest; @@ -163,6 +166,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana Map<String, APIResult> results = new HashMap<String, APIResult>(); final Set<String> colos = getApplicableColos(type, entity); + entityHasExtensionJobTag(entity); validateEntity(entity, colos); results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) { @@ -246,7 +250,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana * Delete the specified entity. * @param request Servlet Request * @param type Valid options are cluster, feed or process. - * @param entity Name of the entity. + * @param entityName Name of the entity. * @param ignore colo is ignored * @return Results of the delete operation. */ @@ -257,21 +261,26 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override public APIResult delete( @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type, - @Dimension("entityName") @PathParam("entity") final String entity, + @Dimension("entityName") @PathParam("entity") final String entityName, @Dimension("colo") @QueryParam("colo") String ignore) { + try { + isEntityPartOfAnExtension(EntityUtil.getEntity(type, entityName)); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e); + } final HttpServletRequest bufferedRequest = new BufferedRequest(request); Map<String, APIResult> results = new HashMap<String, APIResult>(); - results.put(FALCON_TAG, new EntityProxy(type, entity) { + results.put(FALCON_TAG, new EntityProxy(type, entityName) { @Override public APIResult execute() { try { - EntityUtil.getEntity(type, entity); + EntityUtil.getEntity(type, entityName); return super.execute(); } catch (EntityNotRegisteredException e) { return new APIResult(APIResult.Status.SUCCEEDED, - entity + "(" + type + ") doesn't exist. Nothing to do"); + entityName + "(" + type + ") doesn't exist. Nothing to do"); } catch (FalconException e) { throw FalconWebException.newAPIException(e); } @@ -279,13 +288,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected APIResult doExecute(String colo) throws FalconException { - return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entity, colo); + return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo); } }.execute()); // delete only if deleted from everywhere if (!embeddedMode && results.get(FALCON_TAG).getStatus() == APIResult.Status.SUCCEEDED) { - results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo)); + results.put(PRISM_TAG, super.delete(bufferedRequest, type, entityName, currentColo)); } return consolidateResult(results, APIResult.class); } @@ -310,9 +319,16 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Dimension("colo") @QueryParam("colo") String ignore, @QueryParam("skipDryRun") final Boolean skipDryRun) { + try { + isEntityPartOfAnExtension(EntityUtil.getEntity(type, entityName)); + } catch (FalconException e) { + throw FalconWebException.newAPIException(e); + } final HttpServletRequest bufferedRequest = new BufferedRequest(request); + Entity newEntity = getEntity(bufferedRequest, type); + entityHasExtensionJobTag(newEntity); final Set<String> oldColos = getApplicableColos(type, entityName); - final Set<String> newColos = getApplicableColos(type, getEntity(bufferedRequest, type)); + final Set<String> newColos = getApplicableColos(type, newEntity); final Set<String> mergedColos = new HashSet<String>(); mergedColos.addAll(oldColos); mergedColos.retainAll(newColos); //Common colos where update should be called @@ -378,6 +394,34 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana return consolidateResult(results, APIResult.class); } + private void isEntityPartOfAnExtension(Entity entity) { + String tags = entity.getTags(); + checkExtensionJobExist(tags); + } + + + private void entityHasExtensionJobTag(Entity entity) { + String tags = entity.getTags(); + if (StringUtils.isNotBlank(tags)) { + String jobName = ExtensionManager.getJobNameFromTag(tags); + if (StringUtils.isNotBlank(jobName)) { + throw FalconWebException.newAPIException("Entity has extension job name in the tag. Such entities need " + + "to be submitted as extension jobs:" + jobName); + } + } + } + + private void checkExtensionJobExist(String tags) { + if (tags != null) { + String jobName = ExtensionManager.getJobNameFromTag(tags); + ExtensionMetaStore extensionMetaStore = ExtensionStore.getMetaStore(); + if (jobName != null && extensionMetaStore.checkIfExtensionJobExists(jobName)) { + throw FalconWebException.newAPIException("Entity operation is not allowed on this entity as it is" + + "part of an extension job:" + jobName); + } + } + } + /** * Updates the dependent entities of a cluster in workflow engine. * @param clusterName Name of cluster. @@ -575,16 +619,18 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @QueryParam("skipDryRun") Boolean skipDryRun, @QueryParam("properties") String properties) { BufferedRequest bufferedRequest = new BufferedRequest(request); - String entity = getEntity(bufferedRequest, type).getName(); + final Entity entity = getEntity(bufferedRequest, type); + String entityName = entity.getName(); + entityHasExtensionJobTag(entity); Map<String, APIResult> results = new HashMap<String, APIResult>(); results.put("submit", submit(bufferedRequest, type, coloExpr)); - results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun, properties)); + results.put("schedule", schedule(bufferedRequest, type, entityName, coloExpr, skipDryRun, properties)); return consolidateResult(results, APIResult.class); } /** * Suspend an entity. - * @param request Servlet Request + * @param request Servlet Requests * @param type Valid options are feed or process. * @param entity Name of the entity. * @param coloExpr Colo on which the query should be run.
