Repository: falcon
Updated Branches:
  refs/heads/master e48877523 -> fcd066a0a


FALCON-1974 Cluster update : Allow superuser to update bundle/coord of 
dependent entities

Author: bvellanki <[email protected]>

Reviewers: "Ying Zheng  <[email protected]>"

Closes #151 from bvellanki/FALCON-1974


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fcd066a0
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fcd066a0
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fcd066a0

Branch: refs/heads/master
Commit: fcd066a0a04da49b6ed507f832059b61faa66a79
Parents: e488775
Author: bvellanki <[email protected]>
Authored: Fri May 20 09:37:51 2016 -0700
Committer: bvellanki <[email protected]>
Committed: Fri May 20 09:37:51 2016 -0700

----------------------------------------------------------------------
 .../workflow/engine/OozieWorkflowEngine.java    | 81 ++++++++++----------
 .../falcon/resource/AbstractEntityManager.java  | 78 +++++++++++++------
 .../falcon/cli/FalconClusterUpdateCLIIT.java    | 10 ++-
 3 files changed, 99 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 6b87b38..25f7180 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1274,7 +1274,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
             LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
                     newEntity.toShortString(), cluster, bundle.getId());
             result.append(updateInternal(oldEntity, newEntity, clusterEntity, 
bundle,
-                    CurrentUser.getUser(), skipDryRun)).append("\n");
+                    bundle.getUser(), skipDryRun)).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}", 
newEntity.toShortString(), cluster,
                 bundle.getId());
         }
@@ -1434,34 +1434,39 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
     }
 
     private String updateInternal(Entity oldEntity, Entity newEntity, Cluster 
cluster, BundleJob oldBundle,
-        String user, Boolean skipDryRun) throws FalconException {
+                                  String user, Boolean skipDryRun) throws 
FalconException {
+        String currentUser = CurrentUser.getUser();
+        switchUser(user);
+
         String clusterName = cluster.getName();
 
         Date effectiveTime = getEffectiveTime(cluster, newEntity);
         LOG.info("Effective time " + effectiveTime);
+        try {
+            //Validate that new entity can be scheduled
+            dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);
+
+            boolean suspended = 
BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
+
+            //Set end times for old coords
+            updateCoords(clusterName, oldBundle, 
EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);
+            //schedule new entity
+            String newJobId = scheduleForUpdate(newEntity, cluster, 
effectiveTime);
+            BundleJob newBundle = null;
+            if (newJobId != null) {
+                newBundle = getBundleInfo(clusterName, newJobId);
+            }
 
-        //Validate that new entity can be scheduled
-        dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);
-
-        boolean suspended = 
BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
-
-        //Set end times for old coords
-        updateCoords(clusterName, oldBundle, 
EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);
-
-        //schedule new entity
-        String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, 
user);
-        BundleJob newBundle = null;
-        if (newJobId != null) {
-            newBundle = getBundleInfo(clusterName, newJobId);
-        }
-
-        //Sometimes updateCoords() resumes the suspended coords. So, if 
already suspended, resume now
-        //Also suspend new bundle
-        if (suspended) {
-            doBundleAction(newEntity, BundleAction.SUSPEND, cluster.getName());
+            //Sometimes updateCoords() resumes the suspended coords. So, if 
already suspended, resume now
+            //Also suspend new bundle
+            if (suspended) {
+                doBundleAction(newEntity, BundleAction.SUSPEND, 
cluster.getName());
+            }
+            return getUpdateString(newEntity, effectiveTime, oldBundle, 
newBundle);
+        } finally {
+            // Switch back to current user in case of exception.
+            switchUser(currentUser);
         }
-
-        return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle);
     }
 
     private Date getEffectiveTime(Cluster cluster, Entity newEntity) {
@@ -1484,27 +1489,19 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
         }
     }
 
-    private String scheduleForUpdate(Entity entity, Cluster cluster, Date 
startDate, String user)
-        throws FalconException {
+    private String scheduleForUpdate(Entity entity, Cluster cluster, Date 
startDate) throws FalconException {
         Entity clone = entity.copy();
-
-        String currentUser = CurrentUser.getUser();
-        switchUser(user);
-        try {
-            EntityUtil.setStartDate(clone, cluster.getName(), startDate);
-            Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
-            OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
-            Properties properties = builder.build(cluster, buildPath);
-            if (properties != null) {
-                LOG.info("Scheduling {} on cluster {} with props {}", 
entity.toShortString(), cluster.getName(),
+        EntityUtil.setStartDate(clone, cluster.getName(), startDate);
+        Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
+        Properties properties = builder.build(cluster, buildPath);
+        if (properties != null) {
+            LOG.info("Scheduling {} on cluster {} with props {}", 
entity.toShortString(), cluster.getName(),
                     properties);
-                return scheduleEntity(cluster.getName(), properties, entity);
-            } else {
-                LOG.info("No new workflow to be scheduled for this " + 
entity.toShortString());
-                return null;
-            }
-        } finally {
-            switchUser(currentUser);
+            return scheduleEntity(cluster.getName(), properties, entity);
+        } else {
+            LOG.info("No new workflow to be scheduled for this " + 
entity.toShortString());
+            return null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java 
b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 1f6be41..5fa345d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -375,6 +375,7 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
     public APIResult updateClusterDependents(String clusterName, String colo, 
Boolean skipDryRun) {
         checkColo(colo);
         try {
+            verifySuperUser();
             Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, 
clusterName);
             verifySafemodeOperation(cluster, 
EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS);
             int clusterVersion = cluster.getVersion();
@@ -390,41 +391,43 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
                 Entity entity = EntityUtil.getEntity(depEntity.second, 
depEntity.first);
                 switch (entity.getEntityType()) {
                 case FEED:
-                    Clusters feedClusters = ((Feed)entity).getClusters();
-                    List<org.apache.falcon.entity.v0.feed.Cluster> 
updatedFeedClusters =
-                            new 
ArrayList<org.apache.falcon.entity.v0.feed.Cluster>();
+                    Feed newFeedEntity = (Feed) entity.copy();
+                    Clusters feedClusters = newFeedEntity.getClusters();
                     if (feedClusters != null) {
+                        boolean requireUpdate = false;
                         for(org.apache.falcon.entity.v0.feed.Cluster 
feedCluster : feedClusters.getClusters()) {
                             if (feedCluster.getName().equals(clusterName)
                                     && feedCluster.getVersion() != 
clusterVersion) {
                                 // update feed cluster entity
                                 feedCluster.setVersion(clusterVersion);
+                                requireUpdate = true;
                             }
-                            updatedFeedClusters.add(feedCluster);
                         }
-                        ((Feed)entity).getClusters().getClusters().clear();
-                        
((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters);
-                        result.append(update(entity, 
entity.getEntityType().name(),
-                                entity.getName(), skipDryRun).getMessage());
+                        if (requireUpdate) {
+                            
result.append(getWorkflowEngine(entity).update(entity, newFeedEntity,
+                                    cluster.getName(), skipDryRun));
+                            updateEntityInConfigStore(entity, newFeedEntity);
+                        }
                     }
                     break;
                 case PROCESS:
-                    org.apache.falcon.entity.v0.process.Clusters 
processClusters = ((Process)entity).getClusters();
-                    List<org.apache.falcon.entity.v0.process.Cluster> 
updatedProcClusters =
-                            new 
ArrayList<org.apache.falcon.entity.v0.process.Cluster>();
+                    Process newProcessEntity = (Process) entity.copy();
+                    org.apache.falcon.entity.v0.process.Clusters 
processClusters = newProcessEntity.getClusters();
                     if (processClusters != null) {
+                        boolean requireUpdate = false;
                         for(org.apache.falcon.entity.v0.process.Cluster 
procCluster : processClusters.getClusters()) {
                             if (procCluster.getName().equals(clusterName)
                                     && procCluster.getVersion() != 
clusterVersion) {
                                 // update feed cluster entity
                                 procCluster.setVersion(clusterVersion);
+                                requireUpdate = true;
                             }
-                            updatedProcClusters.add(procCluster);
                         }
-                        ((Process)entity).getClusters().getClusters().clear();
-                        
((Process)entity).getClusters().getClusters().addAll(updatedProcClusters);
-                        result.append(update(entity, 
entity.getEntityType().name(),
-                                entity.getName(), skipDryRun).getMessage());
+                        if (requireUpdate) {
+                            
result.append(getWorkflowEngine(entity).update(entity, newProcessEntity,
+                                    cluster.getName(), skipDryRun));
+                            updateEntityInConfigStore(entity, 
newProcessEntity);
+                        }
                     }
                     break;
                 default:
@@ -432,12 +435,28 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
                 }
             }
             return new APIResult(APIResult.Status.SUCCEEDED, 
result.toString());
-        } catch (FalconException e) {
+        } catch (Exception e) {
             LOG.error("Update failed", e);
             throw FalconWebException.newAPIException(e, 
Response.Status.INTERNAL_SERVER_ERROR);
         }
     }
 
+    private void updateEntityInConfigStore(Entity oldEntity, Entity newEntity) 
{
+        List<Entity> tokenList = new ArrayList<>();
+        try {
+            configStore.initiateUpdate(newEntity);
+            obtainEntityLocks(oldEntity, "update", tokenList);
+            configStore.update(newEntity.getEntityType(), newEntity);
+        } catch (Throwable e) {
+            LOG.error("Update failed", e);
+            throw FalconWebException.newAPIException(e);
+        } finally {
+            ConfigurationStore.get().cleanupUpdateInit();
+            releaseEntityLocks(oldEntity.getName(), tokenList);
+        }
+
+    }
+
     private void obtainEntityLocks(Entity entity, String command, List<Entity> 
tokenList)
         throws FalconException {
         //first obtain lock for the entity for which update is issued.
@@ -483,12 +502,7 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
         }
 
         if (oldEntity.getEntityType() == EntityType.CLUSTER) {
-            final UserGroupInformation authenticatedUGI = 
CurrentUser.getAuthenticatedUGI();
-            DefaultAuthorizationProvider authorizationProvider = new 
DefaultAuthorizationProvider();
-            if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
-                throw new FalconException("Permission denied : "
-                        + "Cluster entity update can only be performed by 
superuser.");
-            }
+            verifySuperUser();
         }
 
         String[] props = oldEntity.getEntityType().getImmutableProperties();
@@ -529,8 +543,15 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
     }
 
     protected void verifySafemodeOperation(Entity entity, 
EntityUtil.ENTITY_OPERATION operation) {
-        // if Falcon not in safemode, return
+        // if Falcon not in safemode, allow everything except cluster update
         if (!StartupProperties.isServerInSafeMode()) {
+            if (operation.equals(EntityUtil.ENTITY_OPERATION.UPDATE)
+                    && entity.getEntityType().equals(EntityType.CLUSTER)) {
+                LOG.error("Entity operation {} is only allowed on cluster 
entities during safemode",
+                        operation.name());
+                throw FalconWebException.newAPIException("Entity operation " + 
operation.name()
+                        + " is only allowed on cluster entities during 
safemode");
+            }
             return;
         }
 
@@ -1354,4 +1375,13 @@ public abstract class AbstractEntityManager extends 
AbstractMetadataResource {
         }
         return false;
     }
+
+    private void verifySuperUser() throws FalconException, IOException {
+        final UserGroupInformation authenticatedUGI = 
CurrentUser.getAuthenticatedUGI();
+        DefaultAuthorizationProvider authorizationProvider = new 
DefaultAuthorizationProvider();
+        if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
+            throw new FalconException("Permission denied : "
+                    + "Cluster entity update can only be performed by 
superuser.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fcd066a0/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java 
b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
index f5efa37..b6d2410 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
@@ -72,16 +72,18 @@ public class FalconClusterUpdateCLIIT {
         filePath = 
TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, 
overlay);
         Assert.assertEquals(executeWithURL("entity -submit -type process -file 
" + filePath), 0);
 
+        // update cluster outside safemode, it should fail
+        filePath = 
TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, 
overlay);
+        Assert.assertEquals(executeWithURL("entity -update -type cluster -file 
"
+                + filePath + " -name " + overlay.get("cluster")), -1);
 
-        // Update cluster here and test that it works
-
+        // Update cluster after setting safemode and test that it works
         initSafemode();
-        filePath = 
TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, 
overlay);
         Assert.assertEquals(executeWithURL("entity -update -type cluster -file 
"
                 + filePath + " -name " + overlay.get("cluster")), 0);
         clearSafemode();
 
-        // Try to update dependent entities
+        // Try to update dependent entities, it should succeed
         Assert.assertEquals(executeWithURL("entity -updateClusterDependents 
-cluster "
                 + overlay.get("cluster") + " -skipDryRun "), 0);
 

Reply via email to