This is an automated email from the ASF dual-hosted git repository.

zanderxu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74d30a5dce70 HDFS-17532. RBF: Allow router state store cache update to 
overwrite and delete in parallel (#6839)
74d30a5dce70 is described below

commit 74d30a5dce704543578baac12b0cff9684fd9d44
Author: Felix Nguyen <23214709+kokonguyen...@users.noreply.github.com>
AuthorDate: Tue May 28 11:17:08 2024 +0800

    HDFS-17532. RBF: Allow router state store cache update to overwrite and 
delete in parallel (#6839)
---
 .../server/federation/router/RBFConfigKeys.java    |  3 +
 .../server/federation/store/CachedRecordStore.java | 19 ++---
 .../federation/store/driver/StateStoreDriver.java  | 89 +++++++++++++++++++++-
 .../store/driver/impl/StateStoreFileBaseImpl.java  |  1 +
 .../store/driver/impl/StateStoreMySQLImpl.java     |  1 +
 .../store/driver/impl/StateStoreZooKeeperImpl.java |  1 +
 .../src/main/resources/hdfs-rbf-default.xml        | 10 +++
 .../src/site/markdown/HDFSRouterFederation.md      |  1 +
 .../store/records/MockStateStoreDriver.java        |  1 +
 9 files changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 5189b6b13459..512b1936f432 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -218,6 +218,9 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
       FEDERATION_STORE_PREFIX + "driver.class";
   public static final Class<? extends StateStoreDriver>
       FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class;
+  public static final String 
FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS =
+      FEDERATION_STORE_PREFIX + "driver.async.override.max.threads";
+  public static final int 
FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT = -1;
 
   public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
       FEDERATION_STORE_PREFIX + "connection.test";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
index 3a2995eba2a6..0686f6b302e4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -173,7 +172,7 @@ public abstract class CachedRecordStore<R extends 
BaseRecord>
    */
   public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
     List<R> commitRecords = new ArrayList<>();
-    List<R> toDeleteRecords = new ArrayList<>();
+    List<R> deleteRecords = new ArrayList<>();
     List<R> newRecords = query.getRecords();
     long currentDriverTime = query.getTimestamp();
     if (newRecords == null || currentDriverTime <= 0) {
@@ -184,22 +183,18 @@ public abstract class CachedRecordStore<R extends 
BaseRecord>
       if (record.shouldBeDeleted(currentDriverTime)) {
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         LOG.info("State Store record to delete {}: {}", recordName, record);
-        toDeleteRecords.add(record);
+        deleteRecords.add(record);
       } else if (!record.isExpired() && 
record.checkExpired(currentDriverTime)) {
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         LOG.info("Override State Store record {}: {}", recordName, record);
         commitRecords.add(record);
       }
     }
-    if (commitRecords.size() > 0) {
-      getDriver().putAll(commitRecords, true, false);
-    }
-    if (!toDeleteRecords.isEmpty()) {
-      for (Map.Entry<R, Boolean> entry : 
getDriver().removeMultiple(toDeleteRecords).entrySet()) {
-        if (entry.getValue()) {
-          newRecords.remove(entry.getKey());
-        }
-      }
+    List<R> removedRecords = 
getDriver().handleOverwriteAndDelete(commitRecords, deleteRecords);
+    // In driver async mode, driver will return null and skip the next block.
+    // newRecords might be stale as a result but will sort itself out the next 
override cycle.
+    if (removedRecords != null && !removedRecords.isEmpty()) {
+      newRecords.removeAll(removedRecords);
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
index dfd6c97ed36e..274b14b24f24 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -17,13 +17,22 @@
  */
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
@@ -54,6 +63,9 @@ public abstract class StateStoreDriver implements 
StateStoreRecordOperations {
   /** State Store metrics. */
   private StateStoreMetrics metrics;
 
+  /** Thread pool to delegate overwrite and deletion asynchronously. */
+  private ThreadPoolExecutor executor = null;
+
   /**
    * Initialize the state store connection.
    *
@@ -88,6 +100,18 @@ public abstract class StateStoreDriver implements 
StateStoreRecordOperations {
         return false;
       }
     }
+
+    int nThreads = conf.getInt(
+        RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS,
+        
RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT);
+    if (nThreads > 0) {
+      executor = new ThreadPoolExecutor(nThreads, nThreads, 1L, 
TimeUnit.MINUTES,
+          new LinkedBlockingQueue<>());
+      executor.allowCoreThreadTimeOut(true);
+      LOG.info("Init StateStoreDriver in async mode with {} threads.", 
nThreads);
+    } else {
+      LOG.info("Init StateStoreDriver in sync mode.");
+    }
     return true;
   }
 
@@ -169,7 +193,12 @@ public abstract class StateStoreDriver implements 
StateStoreRecordOperations {
    *
    * @throws Exception if something goes wrong while closing the state store 
driver connection.
    */
-  public abstract void close() throws Exception;
+  public void close() throws Exception {
+    if (executor != null) {
+      executor.shutdown();
+      executor = null;
+    }
+  }
 
   /**
    * Returns the current time synchronization from the underlying store.
@@ -206,4 +235,62 @@ public abstract class StateStoreDriver implements 
StateStoreRecordOperations {
     }
     return hostname;
   }
+
+  /**
+   * Try to overwrite records in commitRecords and remove records in 
deleteRecords.
+   * Should return null if async mode is used. Else return removed records.
+   * @param commitRecords records to overwrite in state store
+   * @param deleteRecords records to remove from state store
+   * @param <R> record class
+   * @throws IOException when there is a failure during overwriting or deletion
+   * @return null if async mode is used, else removed records
+   */
+  public <R extends BaseRecord> List<R> handleOverwriteAndDelete(List<R> 
commitRecords,
+      List<R> deleteRecords) throws IOException {
+    List<R> result = null;
+    try {
+      // Overwrite all expired records.
+      if (commitRecords != null && !commitRecords.isEmpty()) {
+        Runnable overwriteCallable =
+            () -> {
+              try {
+                putAll(commitRecords, true, false);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            };
+        if (executor != null) {
+          executor.execute(overwriteCallable);
+        } else {
+          overwriteCallable.run();
+        }
+      }
+
+      // Delete all deletable records.
+      if (deleteRecords != null && !deleteRecords.isEmpty()) {
+        Map<R, Boolean> removedRecords = new HashMap<>();
+        Runnable deletionCallable = () -> {
+          try {
+            removedRecords.putAll(removeMultiple(deleteRecords));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        };
+        if (executor != null) {
+          executor.execute(deletionCallable);
+        } else {
+          result = new ArrayList<>();
+          deletionCallable.run();
+          for (Map.Entry<R, Boolean> entry : removedRecords.entrySet()) {
+            if (entry.getValue()) {
+              result.add(entry.getKey());
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return result;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
index f9f04f187bb1..07ca94649b80 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -201,6 +201,7 @@ public abstract class StateStoreFileBaseImpl
 
   @Override
   public void close() throws Exception {
+    super.close();
     if (this.concurrentStoreAccessPool != null) {
       this.concurrentStoreAccessPool.shutdown();
       boolean isTerminated = 
this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
index 43d65e4023e3..d3a8a063e0f9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
@@ -125,6 +125,7 @@ public class StateStoreMySQLImpl extends 
StateStoreSerializableImpl {
 
   @Override
   public void close() throws Exception {
+    super.close();
     connectionFactory.shutdown();
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index 0e72cf417565..4b45197f63e9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -140,6 +140,7 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
 
   @Override
   public void close() throws Exception {
+    super.close();
     if (executorService != null) {
       executorService.shutdown();
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index ec4fa46ecc35..c49f57667542 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -370,6 +370,16 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.store.driver.async.override.max.threads</name>
+    <value>-1</value>
+    <description>
+      Number of threads used by StateStoreDriver to overwrite and delete 
records asynchronously.
+      Only used by MembershipStore and RouterStore. Non-positive values will 
make StateStoreDriver
+      run in sync mode.
+    </description>
+  </property>
+
   <property>
     <name>dfs.federation.router.store.connection.test</name>
     <value>60000</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index 9d565f3c4248..ed62aec7209c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -469,6 +469,7 @@ The connection to the State Store and the internal caching 
at the Router.
 | dfs.federation.router.store.connection.test | 60000 | How often to check for 
the connection to the State Store in milliseconds. |
 | dfs.federation.router.cache.ttl | 60000 | How often to refresh the State 
Store caches in milliseconds. |
 | dfs.federation.router.store.membership.expiration | 300000 | Expiration time 
in milliseconds for a membership record. |
+| dfs.federation.router.store.driver.async.override.max.threads |  | Number of 
threads to overwrite and delete records asynchronously when overriding. |
 | dfs.federation.router.mount-table.cache.update | false | If true, Mount 
table cache is updated whenever a mount table entry is added, modified or 
removed for all the routers. |
 | dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to 
wait for all the routers to finish their mount table cache update. |
 | dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max 
time a RouterClient connection can be cached. |
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
index d0821a1711b5..24874375e24b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
@@ -58,6 +58,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl {
 
   @Override
   public void close() throws Exception {
+    super.close();
     VALUE_MAP.clear();
     initialized = false;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to