This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7615945  HDFS-14814. RBF: RouterQuotaUpdateService supports inherited 
rule. Contributed by Jinglun.
7615945 is described below

commit 761594549ec0c6bab50a28a7eb6c741aec7239d7
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Tue Oct 8 14:01:44 2019 +0530

    HDFS-14814. RBF: RouterQuotaUpdateService supports inherited rule. 
Contributed by Jinglun.
---
 .../hdfs/server/federation/router/Quota.java       | 70 ++++++++++++++++-
 .../federation/router/RouterQuotaManager.java      | 27 +++++++
 .../router/RouterQuotaUpdateService.java           | 49 ++++++++----
 .../federation/router/TestDisableRouterQuota.java  | 10 +++
 .../server/federation/router/TestRouterQuota.java  | 88 ++++++++++++++++++++++
 5 files changed, 226 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
index 48f0b96..df5f319 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.Set;
 
 import org.apache.hadoop.fs.QuotaUsage;
@@ -70,13 +72,30 @@ public class Quota {
    */
   public void setQuota(String path, long namespaceQuota,
       long storagespaceQuota, StorageType type) throws IOException {
+    setQuotaInternal(path, null, namespaceQuota, storagespaceQuota, type);
+  }
+
+  /**
+   * Set quota for the federation path.
+   * @param path Federation path.
+   * @param locations Locations of the Federation path.
+   * @param namespaceQuota Name space quota.
+   * @param storagespaceQuota Storage space quota.
+   * @param type StorageType that the space quota is intended to be set on.
+   * @throws IOException If the quota system is disabled.
+   */
+  void setQuotaInternal(String path, List<RemoteLocation> locations,
+      long namespaceQuota, long storagespaceQuota, StorageType type)
+      throws IOException {
     rpcServer.checkOperation(OperationCategory.WRITE);
     if (!router.isQuotaEnabled()) {
       throw new IOException("The quota system is disabled in Router.");
     }
 
     // Set quota for current path and its children mount table path.
-    final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+    if (locations == null) {
+      locations = getQuotaRemoteLocations(path);
+    }
     if (LOG.isDebugEnabled()) {
       for (RemoteLocation loc : locations) {
         LOG.debug("Set quota for path: nsId: {}, dest: {}.",
@@ -92,12 +111,23 @@ public class Quota {
   }
 
   /**
-   * Get quota usage for the federation path.
+   * Get aggregated quota usage for the federation path.
    * @param path Federation path.
    * @return Aggregated quota.
    * @throws IOException If the quota system is disabled.
    */
   public QuotaUsage getQuotaUsage(String path) throws IOException {
+    return aggregateQuota(getEachQuotaUsage(path));
+  }
+
+  /**
+   * Get quota usage for the federation path.
+   * @param path Federation path.
+   * @return quota usage for each remote location.
+   * @throws IOException If the quota system is disabled.
+   */
+  Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
+      throws IOException {
     rpcServer.checkOperation(OperationCategory.READ);
     if (!router.isQuotaEnabled()) {
       throw new IOException("The quota system is disabled in Router.");
@@ -109,7 +139,39 @@ public class Quota {
     Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent(
         quotaLocs, method, true, false, QuotaUsage.class);
 
-    return aggregateQuota(results);
+    return results;
+  }
+
+  /**
+   * Get global quota for the federation path.
+   * @param path Federation path.
+   * @return global quota for path.
+   * @throws IOException If the quota system is disabled.
+   */
+  QuotaUsage getGlobalQuota(String path) throws IOException {
+    if (!router.isQuotaEnabled()) {
+      throw new IOException("The quota system is disabled in Router.");
+    }
+
+    long nQuota = HdfsConstants.QUOTA_RESET;
+    long sQuota = HdfsConstants.QUOTA_RESET;
+    RouterQuotaManager manager = this.router.getQuotaManager();
+    TreeMap<String, RouterQuotaUsage> pts =
+        manager.getParentsContainingQuota(path);
+    Entry<String, RouterQuotaUsage> entry = pts.lastEntry();
+    while (entry != null && (nQuota == HdfsConstants.QUOTA_RESET
+        || sQuota == HdfsConstants.QUOTA_RESET)) {
+      String ppath = entry.getKey();
+      QuotaUsage quota = entry.getValue();
+      if (nQuota == HdfsConstants.QUOTA_RESET) {
+        nQuota = quota.getQuota();
+      }
+      if (sQuota == HdfsConstants.QUOTA_RESET) {
+        sQuota = quota.getSpaceQuota();
+      }
+      entry = pts.lowerEntry(ppath);
+    }
+    return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota).build();
   }
 
   /**
@@ -157,7 +219,7 @@ public class Quota {
    * @param results Quota query result.
    * @return Aggregated Quota.
    */
-  private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
+  QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
     long nsCount = 0;
     long ssCount = 0;
     long nsQuota = HdfsConstants.QUOTA_RESET;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
index e818f5a..f478436 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
 import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
 
 import java.util.HashSet;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -114,6 +115,32 @@ public class RouterQuotaManager {
   }
 
   /**
+   * Get parent paths (including itself) and quotas of the specified federation
+   * path. Only parents containing quota are returned.
+   * @param childPath Federated path.
+   * @return TreeMap of parent paths and quotas.
+   */
+  TreeMap<String, RouterQuotaUsage> getParentsContainingQuota(
+      String childPath) {
+    TreeMap<String, RouterQuotaUsage> res = new TreeMap<>();
+    readLock.lock();
+    try {
+      Entry<String, RouterQuotaUsage> entry = this.cache.floorEntry(childPath);
+      while (entry != null) {
+        String mountPath = entry.getKey();
+        RouterQuotaUsage quota = entry.getValue();
+        if (isQuotaSet(quota) && isParentEntry(childPath, mountPath)) {
+          res.put(mountPath, quota);
+        }
+        entry = this.cache.lowerEntry(mountPath);
+      }
+      return res;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
    * Put new entity into cache.
    * @param path Mount table path.
    * @param quotaUsage Corresponding cache value.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
index dd21e1a..7982bc9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -18,16 +18,20 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.QuotaUsage;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
 import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
 import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
@@ -79,6 +83,7 @@ public class RouterQuotaUpdateService extends PeriodicService 
{
     try {
       List<MountTable> updateMountTables = new LinkedList<>();
       List<MountTable> mountTables = getQuotaSetMountTables();
+      Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
       for (MountTable entry : mountTables) {
         String src = entry.getSourcePath();
         RouterQuotaUsage oldQuota = entry.getQuota();
@@ -102,25 +107,17 @@ public class RouterQuotaUpdateService extends 
PeriodicService {
           // Call RouterRpcServer#getQuotaUsage for getting current quota 
usage.
           // If any exception occurs catch it and proceed with other entries.
           try {
-            currentQuotaUsage = this.rpcServer.getQuotaModule()
-                .getQuotaUsage(src);
+            Quota quotaModule = this.rpcServer.getQuotaModule();
+            Map<RemoteLocation, QuotaUsage> usageMap =
+                quotaModule.getEachQuotaUsage(src);
+            currentQuotaUsage = quotaModule.aggregateQuota(usageMap);
+            remoteQuotaUsage.putAll(usageMap);
           } catch (IOException ioe) {
             LOG.error("Unable to get quota usage for " + src, ioe);
             continue;
           }
         }
 
-        // If quota is not set in some subclusters under federation path,
-        // set quota for this path.
-        if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_RESET) {
-          try {
-            this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
-          } catch (IOException ioe) {
-            LOG.error("Unable to set quota at remote location for "
-                + src, ioe);
-          }
-        }
-
         RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
             currentQuotaUsage);
         this.quotaManager.put(src, newQuota);
@@ -139,12 +136,36 @@ public class RouterQuotaUpdateService extends 
PeriodicService {
         }
       }
 
+      // Fix inconsistent quota.
+      for (Entry<RemoteLocation, QuotaUsage> en : remoteQuotaUsage
+          .entrySet()) {
+        RemoteLocation remoteLocation = en.getKey();
+        QuotaUsage currentQuota = en.getValue();
+        fixGlobalQuota(remoteLocation, currentQuota);
+      }
+
       updateMountTableEntries(updateMountTables);
     } catch (IOException e) {
       LOG.error("Quota cache updated error.", e);
     }
   }
 
+  private void fixGlobalQuota(RemoteLocation location, QuotaUsage remoteQuota)
+      throws IOException {
+    QuotaUsage gQuota =
+        this.rpcServer.getQuotaModule().getGlobalQuota(location.getSrc());
+    if (remoteQuota.getQuota() != gQuota.getQuota()
+        || remoteQuota.getSpaceQuota() != gQuota.getSpaceQuota()) {
+      this.rpcServer.getQuotaModule()
+          .setQuotaInternal(location.getSrc(), Arrays.asList(location),
+              gQuota.getQuota(), gQuota.getSpaceQuota(), null);
+      LOG.info("[Fix Quota] src={} dst={} oldQuota={}/{} newQuota={}/{}",
+          location.getSrc(), location, remoteQuota.getQuota(),
+          remoteQuota.getSpaceQuota(), gQuota.getQuota(),
+          gQuota.getSpaceQuota());
+    }
+  }
+
   /**
    * Get mount table store management interface.
    * @return MountTableStore instance.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java
index 081f604..28d12fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import java.io.IOException;
 
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -92,4 +93,13 @@ public class TestDisableRouterQuota {
     }
   }
 
+  @Test
+  public void testGetGlobalQuota() throws Exception {
+    LambdaTestUtils.intercept(IOException.class,
+        "The quota system is disabled in Router.",
+        "The getGlobalQuota call should fail.", () -> {
+          Quota quotaModule = router.getRpcServer().getQuotaModule();
+          quotaModule.getGlobalQuota("/test");
+        });
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
index 5e36262..9873a2e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -868,4 +868,92 @@ public class TestRouterQuota {
     routerFs.listStatus(path);
     routerFs.getContentSummary(path);
   }
+
+  @Test
+  public void testGetGlobalQuota() throws Exception {
+    long nsQuota = 5;
+    long ssQuota = 3 * BLOCK_SIZE;
+    prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
+
+    Quota qModule = routerContext.getRouter().getRpcServer().getQuotaModule();
+    QuotaUsage qu = qModule.getGlobalQuota("/dir-1");
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota, qu.getSpaceQuota());
+    qu = qModule.getGlobalQuota("/dir-1/dir-2");
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota * 2, qu.getSpaceQuota());
+    qu = qModule.getGlobalQuota("/dir-1/dir-2/dir-3");
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota * 2, qu.getSpaceQuota());
+    qu = qModule.getGlobalQuota("/dir-4");
+    assertEquals(-1, qu.getQuota());
+    assertEquals(-1, qu.getSpaceQuota());
+  }
+
+  @Test
+  public void testFixGlobalQuota() throws Exception {
+    long nsQuota = 5;
+    long ssQuota = 3 * BLOCK_SIZE;
+    final FileSystem nnFs = nnContext1.getFileSystem();
+    prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
+
+    QuotaUsage qu = nnFs.getQuotaUsage(new Path("/dir-1"));
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota, qu.getSpaceQuota());
+    qu = nnFs.getQuotaUsage(new Path("/dir-2"));
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota * 2, qu.getSpaceQuota());
+    qu = nnFs.getQuotaUsage(new Path("/dir-3"));
+    assertEquals(nsQuota, qu.getQuota());
+    assertEquals(ssQuota * 2, qu.getSpaceQuota());
+    qu = nnFs.getQuotaUsage(new Path("/dir-4"));
+    assertEquals(-1, qu.getQuota());
+    assertEquals(-1, qu.getSpaceQuota());
+  }
+
+  /**
+   * Add three mount tables.
+   * /dir-1              --> ns0---/dir-1 [nsQuota, ssQuota]
+   * /dir-1/dir-2        --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2]
+   * /dir-1/dir-2/dir-3  --> ns0---/dir-3 [QUOTA_UNSET, QUOTA_UNSET]
+   * /dir-4              --> ns0---/dir-4 [QUOTA_UNSET, QUOTA_UNSET]
+   *
+   * Expect three remote locations' global quota.
+   * ns0---/dir-1 --> [nsQuota, ssQuota]
+   * ns0---/dir-2 --> [nsQuota, ssQuota * 2]
+   * ns0---/dir-3 --> [nsQuota, ssQuota * 2]
+   * ns0---/dir-4 --> [-1, -1]
+   */
+  private void prepareGlobalQuotaTestMountTable(long nsQuota, long ssQuota)
+      throws IOException {
+    final FileSystem nnFs = nnContext1.getFileSystem();
+
+    // Create destination directory
+    nnFs.mkdirs(new Path("/dir-1"));
+    nnFs.mkdirs(new Path("/dir-2"));
+    nnFs.mkdirs(new Path("/dir-3"));
+    nnFs.mkdirs(new Path("/dir-4"));
+
+    MountTable mountTable = MountTable.newInstance("/dir-1",
+        Collections.singletonMap("ns0", "/dir-1"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+    mountTable = MountTable.newInstance("/dir-1/dir-2",
+        Collections.singletonMap("ns0", "/dir-2"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota * 2)
+        .build());
+    addMountTable(mountTable);
+    mountTable = MountTable.newInstance("/dir-1/dir-2/dir-3",
+        Collections.singletonMap("ns0", "/dir-3"));
+    addMountTable(mountTable);
+    mountTable = MountTable.newInstance("/dir-4",
+        Collections.singletonMap("ns0", "/dir-4"));
+    addMountTable(mountTable);
+
+    // Ensure setQuota RPC was invoked and mount table was updated.
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to