This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 74d30a5dce70 HDFS-17532. RBF: Allow router state store cache update to overwrite and delete in parallel (#6839) 74d30a5dce70 is described below commit 74d30a5dce704543578baac12b0cff9684fd9d44 Author: Felix Nguyen <23214709+kokonguyen...@users.noreply.github.com> AuthorDate: Tue May 28 11:17:08 2024 +0800 HDFS-17532. RBF: Allow router state store cache update to overwrite and delete in parallel (#6839) --- .../server/federation/router/RBFConfigKeys.java | 3 + .../server/federation/store/CachedRecordStore.java | 19 ++--- .../federation/store/driver/StateStoreDriver.java | 89 +++++++++++++++++++++- .../store/driver/impl/StateStoreFileBaseImpl.java | 1 + .../store/driver/impl/StateStoreMySQLImpl.java | 1 + .../store/driver/impl/StateStoreZooKeeperImpl.java | 1 + .../src/main/resources/hdfs-rbf-default.xml | 10 +++ .../src/site/markdown/HDFSRouterFederation.md | 1 + .../store/records/MockStateStoreDriver.java | 1 + 9 files changed, 113 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 5189b6b13459..512b1936f432 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -218,6 +218,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "driver.class"; public static final Class<? extends StateStoreDriver> FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class; + public static final String FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS = + FEDERATION_STORE_PREFIX + "driver.async.override.max.threads"; + public static final int FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT = -1; public static final String FEDERATION_STORE_CONNECTION_TEST_MS = FEDERATION_STORE_PREFIX + "connection.test"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 3a2995eba2a6..0686f6b302e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -173,7 +172,7 @@ public abstract class CachedRecordStore<R extends BaseRecord> */ public void overrideExpiredRecords(QueryResult<R> query) throws IOException { List<R> commitRecords = new ArrayList<>(); - List<R> toDeleteRecords = new ArrayList<>(); + List<R> deleteRecords = new ArrayList<>(); List<R> newRecords = query.getRecords(); long currentDriverTime = query.getTimestamp(); if (newRecords == null || currentDriverTime <= 0) { @@ -184,22 +183,18 @@ public abstract class CachedRecordStore<R extends BaseRecord> if (record.shouldBeDeleted(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("State Store record to delete {}: {}", recordName, record); - toDeleteRecords.add(record); + deleteRecords.add(record); } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("Override State Store record {}: {}", recordName, record); commitRecords.add(record); } } - if (commitRecords.size() > 0) { - getDriver().putAll(commitRecords, true, false); - } - if (!toDeleteRecords.isEmpty()) { - for (Map.Entry<R, Boolean> entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) { - if (entry.getValue()) { - newRecords.remove(entry.getKey()); - } - } + List<R> removedRecords = getDriver().handleOverwriteAndDelete(commitRecords, deleteRecords); + // In driver async mode, driver will return null and skip the next block. + // newRecords might be stale as a result but will sort itself out the next override cycle. + if (removedRecords != null && !removedRecords.isEmpty()) { + newRecords.removeAll(removedRecords); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index dfd6c97ed36e..274b14b24f24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -17,13 +17,22 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver; +import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; @@ -54,6 +63,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** State Store metrics. */ private StateStoreMetrics metrics; + /** Thread pool to delegate overwrite and deletion asynchronously. */ + private ThreadPoolExecutor executor = null; + /** * Initialize the state store connection. * @@ -88,6 +100,18 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { return false; } } + + int nThreads = conf.getInt( + RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS, + RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT); + if (nThreads > 0) { + executor = new ThreadPoolExecutor(nThreads, nThreads, 1L, TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + executor.allowCoreThreadTimeOut(true); + LOG.info("Init StateStoreDriver in async mode with {} threads.", nThreads); + } else { + LOG.info("Init StateStoreDriver in sync mode."); + } return true; } @@ -169,7 +193,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { * * @throws Exception if something goes wrong while closing the state store driver connection. */ - public abstract void close() throws Exception; + public void close() throws Exception { + if (executor != null) { + executor.shutdown(); + executor = null; + } + } /** * Returns the current time synchronization from the underlying store. @@ -206,4 +235,62 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { } return hostname; } + + /** + * Try to overwrite records in commitRecords and remove records in deleteRecords. + * Should return null if async mode is used. Else return removed records. + * @param commitRecords records to overwrite in state store + * @param deleteRecords records to remove from state store + * @param <R> record class + * @throws IOException when there is a failure during overwriting or deletion + * @return null if async mode is used, else removed records + */ + public <R extends BaseRecord> List<R> handleOverwriteAndDelete(List<R> commitRecords, + List<R> deleteRecords) throws IOException { + List<R> result = null; + try { + // Overwrite all expired records. + if (commitRecords != null && !commitRecords.isEmpty()) { + Runnable overwriteCallable = + () -> { + try { + putAll(commitRecords, true, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + if (executor != null) { + executor.execute(overwriteCallable); + } else { + overwriteCallable.run(); + } + } + + // Delete all deletable records. + if (deleteRecords != null && !deleteRecords.isEmpty()) { + Map<R, Boolean> removedRecords = new HashMap<>(); + Runnable deletionCallable = () -> { + try { + removedRecords.putAll(removeMultiple(deleteRecords)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + if (executor != null) { + executor.execute(deletionCallable); + } else { + result = new ArrayList<>(); + deletionCallable.run(); + for (Map.Entry<R, Boolean> entry : removedRecords.entrySet()) { + if (entry.getValue()) { + result.add(entry.getKey()); + } + } + } + } + } catch (Exception e) { + throw new IOException(e); + } + return result; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index f9f04f187bb1..07ca94649b80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -201,6 +201,7 @@ public abstract class StateStoreFileBaseImpl @Override public void close() throws Exception { + super.close(); if (this.concurrentStoreAccessPool != null) { this.concurrentStoreAccessPool.shutdown(); boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java index 43d65e4023e3..d3a8a063e0f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java @@ -125,6 +125,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { @Override public void close() throws Exception { + super.close(); connectionFactory.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 0e72cf417565..4b45197f63e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -140,6 +140,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { @Override public void close() throws Exception { + super.close(); if (executorService != null) { executorService.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index ec4fa46ecc35..c49f57667542 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -370,6 +370,16 @@ </description> </property> + <property> + <name>dfs.federation.router.store.driver.async.override.max.threads</name> + <value>-1</value> + <description> + Number of threads used by StateStoreDriver to overwrite and delete records asynchronously. + Only used by MembershipStore and RouterStore. Non-positive values will make StateStoreDriver + run in sync mode. + </description> + </property> + <property> <name>dfs.federation.router.store.connection.test</name> <value>60000</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index 9d565f3c4248..ed62aec7209c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -469,6 +469,7 @@ The connection to the State Store and the internal caching at the Router. | dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. | | dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. | | dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. | +| dfs.federation.router.store.driver.async.override.max.threads | | Number of threads to overwrite and delete records asynchronously when overriding. | | dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. | | dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. | | dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. | diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java index d0821a1711b5..24874375e24b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -58,6 +58,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { @Override public void close() throws Exception { + super.close(); VALUE_MAP.clear(); initialized = false; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org