Repository: falcon Updated Branches: refs/heads/master e48877523 -> fcd066a0a
FALCON-1974 Cluster update : Allow superuser to update bundle/coord of dependent entities Author: bvellanki <[email protected]> Reviewers: "Ying Zheng <[email protected]>" Closes #151 from bvellanki/FALCON-1974 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fcd066a0 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fcd066a0 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fcd066a0 Branch: refs/heads/master Commit: fcd066a0a04da49b6ed507f832059b61faa66a79 Parents: e488775 Author: bvellanki <[email protected]> Authored: Fri May 20 09:37:51 2016 -0700 Committer: bvellanki <[email protected]> Committed: Fri May 20 09:37:51 2016 -0700 ---------------------------------------------------------------------- .../workflow/engine/OozieWorkflowEngine.java | 81 ++++++++++---------- .../falcon/resource/AbstractEntityManager.java | 78 +++++++++++++------ .../falcon/cli/FalconClusterUpdateCLIIT.java | 10 ++- 3 files changed, 99 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 6b87b38..25f7180 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -1274,7 +1274,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle.getId()); result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle, - CurrentUser.getUser(), skipDryRun)).append("\n"); + bundle.getUser(), skipDryRun)).append("\n"); LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle.getId()); } @@ -1434,34 +1434,39 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle, - String user, Boolean skipDryRun) throws FalconException { + String user, Boolean skipDryRun) throws FalconException { + String currentUser = CurrentUser.getUser(); + switchUser(user); + String clusterName = cluster.getName(); Date effectiveTime = getEffectiveTime(cluster, newEntity); LOG.info("Effective time " + effectiveTime); + try { + //Validate that new entity can be scheduled + dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun); + + boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus()); + + //Set end times for old coords + updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity); + //schedule new entity + String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime); + BundleJob newBundle = null; + if (newJobId != null) { + newBundle = getBundleInfo(clusterName, newJobId); + } - //Validate that new entity can be scheduled - dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun); - - boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus()); - - //Set end times for old coords - updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity); - - //schedule new entity - String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user); - BundleJob newBundle = null; - if (newJobId != null) { - newBundle = getBundleInfo(clusterName, newJobId); - } - - //Sometimes updateCoords() resumes the suspended coords. So, if already suspended, resume now - //Also suspend new bundle - if (suspended) { - doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName()); + //Sometimes updateCoords() resumes the suspended coords. So, if already suspended, resume now + //Also suspend new bundle + if (suspended) { + doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName()); + } + return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle); + } finally { + // Switch back to current user in case of exception. + switchUser(currentUser); } - - return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle); } private Date getEffectiveTime(Cluster cluster, Entity newEntity) { @@ -1484,27 +1489,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } - private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate, String user) - throws FalconException { + private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate) throws FalconException { Entity clone = entity.copy(); - - String currentUser = CurrentUser.getUser(); - switchUser(user); - try { - EntityUtil.setStartDate(clone, cluster.getName(), startDate); - Path buildPath = EntityUtil.getNewStagingPath(cluster, clone); - OozieEntityBuilder builder = OozieEntityBuilder.get(clone); - Properties properties = builder.build(cluster, buildPath); - if (properties != null) { - LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(), + EntityUtil.setStartDate(clone, cluster.getName(), startDate); + Path buildPath = EntityUtil.getNewStagingPath(cluster, clone); + OozieEntityBuilder builder = OozieEntityBuilder.get(clone); + Properties properties = builder.build(cluster, buildPath); + if (properties != null) { + LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster.getName(), properties); - return scheduleEntity(cluster.getName(), properties, entity); - } else { - LOG.info("No new workflow to be scheduled for this " + entity.toShortString()); - return null; - } - } finally { - switchUser(currentUser); + return scheduleEntity(cluster.getName(), properties, entity); + } else { + LOG.info("No new workflow to be scheduled for this " + entity.toShortString()); + return null; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 1f6be41..5fa345d 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -375,6 +375,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { public APIResult updateClusterDependents(String clusterName, String colo, Boolean skipDryRun) { checkColo(colo); try { + verifySuperUser(); Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName); verifySafemodeOperation(cluster, EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS); int clusterVersion = cluster.getVersion(); @@ -390,41 +391,43 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first); switch (entity.getEntityType()) { case FEED: - Clusters feedClusters = ((Feed)entity).getClusters(); - List<org.apache.falcon.entity.v0.feed.Cluster> updatedFeedClusters = - new ArrayList<org.apache.falcon.entity.v0.feed.Cluster>(); + Feed newFeedEntity = (Feed) entity.copy(); + Clusters feedClusters = newFeedEntity.getClusters(); if (feedClusters != null) { + boolean requireUpdate = false; for(org.apache.falcon.entity.v0.feed.Cluster feedCluster : feedClusters.getClusters()) { if (feedCluster.getName().equals(clusterName) && feedCluster.getVersion() != clusterVersion) { // update feed cluster entity feedCluster.setVersion(clusterVersion); + requireUpdate = true; } - updatedFeedClusters.add(feedCluster); } - ((Feed)entity).getClusters().getClusters().clear(); - ((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters); - result.append(update(entity, entity.getEntityType().name(), - entity.getName(), skipDryRun).getMessage()); + if (requireUpdate) { + result.append(getWorkflowEngine(entity).update(entity, newFeedEntity, + cluster.getName(), skipDryRun)); + updateEntityInConfigStore(entity, newFeedEntity); + } } break; case PROCESS: - org.apache.falcon.entity.v0.process.Clusters processClusters = ((Process)entity).getClusters(); - List<org.apache.falcon.entity.v0.process.Cluster> updatedProcClusters = - new ArrayList<org.apache.falcon.entity.v0.process.Cluster>(); + Process newProcessEntity = (Process) entity.copy(); + org.apache.falcon.entity.v0.process.Clusters processClusters = newProcessEntity.getClusters(); if (processClusters != null) { + boolean requireUpdate = false; for(org.apache.falcon.entity.v0.process.Cluster procCluster : processClusters.getClusters()) { if (procCluster.getName().equals(clusterName) && procCluster.getVersion() != clusterVersion) { // update feed cluster entity procCluster.setVersion(clusterVersion); + requireUpdate = true; } - updatedProcClusters.add(procCluster); } - ((Process)entity).getClusters().getClusters().clear(); - ((Process)entity).getClusters().getClusters().addAll(updatedProcClusters); - result.append(update(entity, entity.getEntityType().name(), - entity.getName(), skipDryRun).getMessage()); + if (requireUpdate) { + result.append(getWorkflowEngine(entity).update(entity, newProcessEntity, + cluster.getName(), skipDryRun)); + updateEntityInConfigStore(entity, newProcessEntity); + } } break; default: @@ -432,12 +435,28 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } } return new APIResult(APIResult.Status.SUCCEEDED, result.toString()); - } catch (FalconException e) { + } catch (Exception e) { LOG.error("Update failed", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } } + private void updateEntityInConfigStore(Entity oldEntity, Entity newEntity) { + List<Entity> tokenList = new ArrayList<>(); + try { + configStore.initiateUpdate(newEntity); + obtainEntityLocks(oldEntity, "update", tokenList); + configStore.update(newEntity.getEntityType(), newEntity); + } catch (Throwable e) { + LOG.error("Update failed", e); + throw FalconWebException.newAPIException(e); + } finally { + ConfigurationStore.get().cleanupUpdateInit(); + releaseEntityLocks(oldEntity.getName(), tokenList); + } + + } + private void obtainEntityLocks(Entity entity, String command, List<Entity> tokenList) throws FalconException { //first obtain lock for the entity for which update is issued. @@ -483,12 +502,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } if (oldEntity.getEntityType() == EntityType.CLUSTER) { - final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI(); - DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider(); - if (!authorizationProvider.isSuperUser(authenticatedUGI)) { - throw new FalconException("Permission denied : " - + "Cluster entity update can only be performed by superuser."); - } + verifySuperUser(); } String[] props = oldEntity.getEntityType().getImmutableProperties(); @@ -529,8 +543,15 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } protected void verifySafemodeOperation(Entity entity, EntityUtil.ENTITY_OPERATION operation) { - // if Falcon not in safemode, return + // if Falcon not in safemode, allow everything except cluster update if (!StartupProperties.isServerInSafeMode()) { + if (operation.equals(EntityUtil.ENTITY_OPERATION.UPDATE) + && entity.getEntityType().equals(EntityType.CLUSTER)) { + LOG.error("Entity operation {} is only allowed on cluster entities during safemode", + operation.name()); + throw FalconWebException.newAPIException("Entity operation " + operation.name() + + " is only allowed on cluster entities during safemode"); + } return; } @@ -1354,4 +1375,13 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { } return false; } + + private void verifySuperUser() throws FalconException, IOException { + final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI(); + DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider(); + if (!authorizationProvider.isSuperUser(authenticatedUGI)) { + throw new FalconException("Permission denied : " + + "Cluster entity update can only be performed by superuser."); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java index f5efa37..b6d2410 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java @@ -72,16 +72,18 @@ public class FalconClusterUpdateCLIIT { filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0); + // update cluster outside safemode, it should fail + filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -update -type cluster -file " + + filePath + " -name " + overlay.get("cluster")), -1); - // Update cluster here and test that it works - + // Update cluster after setting safemode and test that it works initSafemode(); - filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay); Assert.assertEquals(executeWithURL("entity -update -type cluster -file " + filePath + " -name " + overlay.get("cluster")), 0); clearSafemode(); - // Try to update dependent entities + // Try to update dependent entities, it should succeed Assert.assertEquals(executeWithURL("entity -updateClusterDependents -cluster " + overlay.get("cluster") + " -skipDryRun "), 0);
