This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a700803  HDFS-15154. Allow only hdfs superusers the ability to assign 
HDFS storage policies. Contributed by Siddharth Wagle.
a700803 is described below

commit a700803a18fb957d2799001a2ce1dcb70f75c080
Author: Arpit Agarwal <aagar...@cloudera.com>
AuthorDate: Wed Mar 25 10:28:30 2020 -0700

    HDFS-15154. Allow only hdfs superusers the ability to assign HDFS storage 
policies. Contributed by Siddharth Wagle.
    
    Change-Id: I32d6dd2837945b8fc026a759aa367c55daefe348
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../hadoop/hdfs/server/namenode/FSDirAttrOp.java   |  12 +-
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |  13 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  61 ++++++--
 .../src/main/resources/hdfs-default.xml            |   9 ++
 .../hdfs/TestStoragePolicyPermissionSettings.java  | 157 +++++++++++++++++++++
 6 files changed, 222 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e3f4d1e..73cddee 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1114,6 +1114,10 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
 
   public static final String  DFS_STORAGE_POLICY_ENABLED_KEY = 
"dfs.storage.policy.enabled";
   public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
+  public static final String DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY 
=
+      "dfs.storage.policy.permissions.superuser-only";
+  public static final boolean
+      DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT = false;
 
   public static final String  DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY = 
"dfs.quota.by.storage.type.enabled";
   public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 83df0aa..8e9606d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -47,7 +47,6 @@ import java.util.EnumSet;
 import java.util.List;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 
 public class FSDirAttrOp {
   static FileStatus setPermission(
@@ -151,7 +150,7 @@ public class FSDirAttrOp {
   static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
       BlockManager bm, String src) throws IOException {
     return setStoragePolicy(fsd, pc, bm, src,
-        HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, "unset");
+        HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
   }
 
   static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
@@ -162,17 +161,12 @@ public class FSDirAttrOp {
       throw new HadoopIllegalArgumentException(
           "Cannot find a block policy with the name " + policyName);
     }
-    return setStoragePolicy(fsd, pc, bm, src, policy.getId(), "set");
+    return setStoragePolicy(fsd, pc, bm, src, policy.getId());
   }
 
   static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
-      BlockManager bm, String src, final byte policyId, final String operation)
+      BlockManager bm, String src, final byte policyId)
       throws IOException {
-    if (!fsd.isStoragePolicyEnabled()) {
-      throw new IOException(String.format(
-          "Failed to %s storage policy since %s is set to false.", operation,
-          DFS_STORAGE_POLICY_ENABLED_KEY));
-    }
     INodesInPath iip;
     fsd.writeLock();
     try {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 77d8518..c06b59f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -88,8 +88,6 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@@ -188,8 +186,6 @@ public class FSDirectory implements Closeable {
 
   // precision of access times.
   private final long accessTimePrecision;
-  // whether setStoragePolicy is allowed.
-  private final boolean storagePolicyEnabled;
   // whether quota by storage type is allowed
   private final boolean quotaByStorageTypeEnabled;
 
@@ -318,10 +314,6 @@ public class FSDirectory implements Closeable {
         DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
         DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
 
-    this.storagePolicyEnabled =
-        conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
-                        DFS_STORAGE_POLICY_ENABLED_DEFAULT);
-
     this.quotaByStorageTypeEnabled =
         conf.getBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY,
                         DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT);
@@ -438,7 +430,7 @@ public class FSDirectory implements Closeable {
    * These statuses are solely for listing purpose. All other operations
    * on the reserved dirs are disallowed.
    * Operations on sub directories are resolved by
-   * {@link FSDirectory#resolvePath(String, byte[][], FSDirectory)}
+   * {@link FSDirectory#resolvePath(String, FSDirectory)}
    * and conducted directly, without the need to check the reserved dirs.
    *
    * This method should only be invoked once during namenode initialization.
@@ -568,9 +560,6 @@ public class FSDirectory implements Closeable {
     return xattrsEnabled;
   }
   int getXattrMaxSize() { return xattrMaxSize; }
-  boolean isStoragePolicyEnabled() {
-    return storagePolicyEnabled;
-  }
   boolean isAccessTimeSupported() {
     return accessTimePrecision > 0;
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 80dc97c..99ad6f2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -30,6 +30,9 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
@@ -92,6 +95,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LI
 import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
 
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.text.CaseUtils;
 import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
@@ -449,6 +454,8 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   private final int maxCorruptFileBlocksReturn;
   private final boolean isPermissionEnabled;
+  private final boolean isStoragePolicyEnabled;
+  private final boolean isStoragePolicySuperuserOnly;
   private final UserGroupInformation fsOwner;
   private final String supergroup;
   private final boolean standbyShouldCheckpoint;
@@ -825,13 +832,22 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
                                  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
       this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
                                                  
DFS_PERMISSIONS_ENABLED_DEFAULT);
+
+      this.isStoragePolicyEnabled =
+          conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
+              DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+      this.isStoragePolicySuperuserOnly =
+          conf.getBoolean(DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY,
+              DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT);
+
       this.snapshotDiffReportLimit =
           conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT,
               DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT);
 
-      LOG.info("fsOwner             = " + fsOwner);
-      LOG.info("supergroup          = " + supergroup);
-      LOG.info("isPermissionEnabled = " + isPermissionEnabled);
+      LOG.info("fsOwner                = " + fsOwner);
+      LOG.info("supergroup             = " + supergroup);
+      LOG.info("isPermissionEnabled    = " + isPermissionEnabled);
+      LOG.info("isStoragePolicyEnabled = " + isStoragePolicyEnabled);
 
       // block allocation has to be persisted in HA using a shared edits 
directory
       // so that the standby has up-to-date namespace information
@@ -2327,6 +2343,27 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   }
 
   /**
+   * Verify storage policies are enabled and if only super user is allowed to
+   * set storage policies.
+   *
+   * @param operationNameReadable Name of storage policy for exception text
+   * @param checkSuperUser Whether to check for super user privilege
+   * @throws IOException
+   */
+  private void checkStoragePolicyEnabled(final String operationNameReadable,
+      boolean checkSuperUser) throws IOException {
+    if (!isStoragePolicyEnabled) {
+      throw new IOException(String.format(
+          "Failed to %s since %s is set to false.", operationNameReadable,
+          DFS_STORAGE_POLICY_ENABLED_KEY));
+    }
+    if (checkSuperUser && isStoragePolicySuperuserOnly) {
+      checkSuperuserPrivilege(
+          CaseUtils.toCamelCase(operationNameReadable, false));
+    }
+  }
+
+  /**
    * Set the storage policy for a file or a directory.
    *
    * @param src file/directory path
@@ -2335,10 +2372,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   void setStoragePolicy(String src, String policyName) throws IOException {
     final String operationName = "setStoragePolicy";
-    FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    checkStoragePolicyEnabled("set storage policy", true);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
+    FileStatus auditStat;
     try {
       writeLock();
       try {
@@ -2366,9 +2404,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   void satisfyStoragePolicy(String src, boolean logRetryCache)
       throws IOException {
     final String operationName = "satisfyStoragePolicy";
+    checkOperation(OperationCategory.WRITE);
+    // make sure storage policy is enabled, otherwise
+    // there is no need to satisfy storage policy.
+    checkStoragePolicyEnabled("satisfy storage policy", false);
     FileStatus auditStat;
     validateStoragePolicySatisfy();
-    checkOperation(OperationCategory.WRITE);
     try {
       writeLock();
       try {
@@ -2389,13 +2430,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   private void validateStoragePolicySatisfy()
       throws UnsupportedActionException, IOException {
-    // make sure storage policy is enabled, otherwise
-    // there is no need to satisfy storage policy.
-    if (!dir.isStoragePolicyEnabled()) {
-      throw new IOException(String.format(
-          "Failed to satisfy storage policy since %s is set to false.",
-          DFS_STORAGE_POLICY_ENABLED_KEY));
-    }
     // checks sps status
     boolean disabled = (blockManager.getSPSManager() == null);
     if (disabled) {
@@ -2418,10 +2452,11 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   void unsetStoragePolicy(String src) throws IOException {
     final String operationName = "unsetStoragePolicy";
-    FileStatus auditStat;
     checkOperation(OperationCategory.WRITE);
+    checkStoragePolicyEnabled("unset storage policy", true);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
+    FileStatus auditStat;
     try {
       writeLock();
       try {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 551fb44..0ce0be6 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3376,6 +3376,15 @@
 </property>
 
 <property>
+  <name>dfs.storage.policy.permissions.superuser-only</name>
+  <value>false</value>
+  <description>
+    Allow only superuser role to change the storage policy on files and
+    directories.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.legacy-oiv-image.dir</name>
   <value></value>
   <description>Determines where to save the namespace in the old fsimage format
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyPermissionSettings.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyPermissionSettings.java
new file mode 100644
index 0000000..81f9126
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStoragePolicyPermissionSettings.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestStoragePolicyPermissionSettings {
+
+  private static final short REPL = 1;
+  private static final int SIZE = 128;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem fs;
+  private static BlockStoragePolicySuite suite;
+  private static BlockStoragePolicy cold;
+  private static UserGroupInformation nonAdmin;
+  private static UserGroupInformation admin;
+
+  @BeforeClass
+  public static void clusterSetUp() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    suite = BlockStoragePolicySuite.createDefaultSuite();
+    cold = suite.getPolicy("COLD");
+    nonAdmin = UserGroupInformation.createUserForTesting(
+        "user1", new String[] {"test"});
+    admin = UserGroupInformation.createUserForTesting("user2",
+        new String[]{"supergroup"});
+  }
+
+  @AfterClass
+  public static void clusterShutdown() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void setFSNameSystemFinalField(String field, boolean value)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = FSNamesystem.class.getDeclaredField(field);
+    f.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
+    f.set(cluster.getNamesystem(), value);
+  }
+
+  private void setStoragePolicyPermissions(boolean isStoragePolicyEnabled,
+                                           boolean 
isStoragePolicySuperuserOnly)
+      throws NoSuchFieldException, IllegalAccessException {
+    setFSNameSystemFinalField("isStoragePolicyEnabled", 
isStoragePolicyEnabled);
+    setFSNameSystemFinalField("isStoragePolicySuperuserOnly",
+        isStoragePolicySuperuserOnly);
+  }
+
+  @Test
+  public void testStoragePolicyPermissionDefault() throws Exception {
+    Path foo = new Path("/foo");
+    DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
+    setStoragePolicyPermissions(true, false);
+    // Test default user fails
+    final FileSystem fileSystemNonAdmin =
+        DFSTestUtil.getFileSystemAs(nonAdmin, conf);
+    LambdaTestUtils.intercept(AccessControlException.class,
+        "Permission denied: user=user1",
+        "Only super user can set storage policy.",
+        () -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
+    // widen privilege
+    fs.setPermission(foo, new FsPermission("777"));
+    assertNotEquals(fs.getStoragePolicy(foo), cold);
+    LambdaTestUtils.eval(
+        () -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
+    assertEquals(fs.getStoragePolicy(foo), cold);
+  }
+
+  @Test
+  public void testStoragePolicyPermissionAdmins() throws Exception {
+    Path foo = new Path("/foo");
+    DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
+    fs.setPermission(foo, new FsPermission("777"));
+    // Test only super can set storage policies
+    setStoragePolicyPermissions(true, true);
+
+    final FileSystem fileSystemNonAdmin =
+        DFSTestUtil.getFileSystemAs(nonAdmin, conf);
+
+    LambdaTestUtils.intercept(AccessControlException.class,
+        "Access denied for user user1. Superuser privilege is required",
+        "Only super user can set storage policy.",
+        () -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
+
+    final FileSystem fileSystemAdmin =
+        DFSTestUtil.getFileSystemAs(admin, conf);
+    assertNotEquals(fs.getStoragePolicy(foo), cold);
+    LambdaTestUtils.eval(
+        () -> fileSystemAdmin.setStoragePolicy(foo, cold.getName()));
+    assertEquals(fs.getStoragePolicy(foo), cold);
+  }
+
+  @Test
+  public void testStoragePolicyPermissionDisabled() throws Exception {
+    Path foo = new Path("/foo");
+    DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
+    fs.setPermission(foo, new FsPermission("777"));
+    setStoragePolicyPermissions(false, false);
+    final FileSystem fileSystemAdmin =
+        DFSTestUtil.getFileSystemAs(admin, conf);
+    LambdaTestUtils.intercept(IOException.class,
+        "Failed to set storage policy " +
+            "since dfs.storage.policy.enabled is set to false.",
+        "Storage policy settings are disabled.",
+        () -> fileSystemAdmin.setStoragePolicy(foo, cold.getName()));
+    assertEquals(suite.getDefaultPolicy(), fs.getStoragePolicy(foo));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to