This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new f5c5d35eb01d HDFS-17529. RBF: Improve router state store cache entry deletion (#6833) f5c5d35eb01d is described below commit f5c5d35eb01dbaebe2f8413bd1cabc727aa7a133 Author: Felix Nguyen <23214709+kokonguyen...@users.noreply.github.com> AuthorDate: Fri May 24 09:41:08 2024 +0800 HDFS-17529. RBF: Improve router state store cache entry deletion (#6833) --- .../server/federation/store/CachedRecordStore.java | 20 +++---- .../store/driver/StateStoreRecordOperations.java | 25 +++++++++ .../store/driver/impl/StateStoreBaseImpl.java | 36 +++++++++++++ .../store/driver/impl/StateStoreZooKeeperImpl.java | 63 +++++++++++++++++----- .../federation/store/driver/TestStateStoreZK.java | 21 ++++++-- 5 files changed, 138 insertions(+), 27 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 59da6145352a..3a2995eba2a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -172,7 +173,7 @@ public abstract class CachedRecordStore<R extends BaseRecord> */ public void overrideExpiredRecords(QueryResult<R> query) throws IOException { List<R> commitRecords = new ArrayList<>(); - List<R> deleteRecords = new ArrayList<>(); + List<R> toDeleteRecords = new ArrayList<>(); List<R> newRecords = query.getRecords(); long currentDriverTime = query.getTimestamp(); if (newRecords == null || currentDriverTime <= 0) { @@ -182,13 +183,8 @@ public abstract class CachedRecordStore<R extends BaseRecord> for (R record : newRecords) { if (record.shouldBeDeleted(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); - if (getDriver().remove(record)) { - deleteRecords.add(record); - LOG.info("Deleted State Store record {}: {}", recordName, record); - } else { - LOG.warn("Couldn't delete State Store record {}: {}", recordName, - record); - } + LOG.info("State Store record to delete {}: {}", recordName, record); + toDeleteRecords.add(record); } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("Override State Store record {}: {}", recordName, record); @@ -198,8 +194,12 @@ public abstract class CachedRecordStore<R extends BaseRecord> if (commitRecords.size() > 0) { getDriver().putAll(commitRecords, true, false); } - if (deleteRecords.size() > 0) { - newRecords.removeAll(deleteRecords); + if (!toDeleteRecords.isEmpty()) { + for (Map.Entry<R, Boolean> entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) { + if (entry.getValue()) { + newRecords.remove(entry.getKey()); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 716f41daf4dd..97f6c680a4b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -127,6 +128,17 @@ public interface StateStoreRecordOperations { @AtMostOnce <T extends BaseRecord> boolean remove(T record) throws IOException; + /** + * Remove multiple records. + * + * @param <T> Record class of the records. + * @param records Records to be removed. + * @return Map of record to a boolean indicating if the record has being removed successfully. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException; + /** * Remove all records of this class from the store. * @@ -152,4 +164,17 @@ public interface StateStoreRecordOperations { <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws IOException; + /** + * Remove all records of a specific class that match any query in a list of queries. + * Requires the getAll implementation to fetch fresh records on each call. + * + * @param clazz The class to match the records with. + * @param queries Queries (logical OR) to filter what to remove. + * @param <T> Record class of the records. + * @return Map of query to number of records removed by that query. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, List<Query<T>> queries) + throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index df3ce21dee27..93ad279e187c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -21,7 +21,10 @@ import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.fil import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -86,4 +89,37 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); return remove(recordClass, query) == 1; } + + @Override + public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T> records) throws IOException { + assert !records.isEmpty(); + // Fall back to iterative remove() calls if all records don't share 1 class + Class<? extends BaseRecord> expectedClazz = records.get(0).getClass(); + if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) { + Map<T, Boolean> result = new HashMap<>(); + for (T record : records) { + result.put(record, remove(record)); + } + return result; + } + + final List<Query<T>> queries = new ArrayList<>(); + for (T record : records) { + queries.add(new Query<>(record)); + } + @SuppressWarnings("unchecked") + Class<T> recordClass = (Class<T>) StateStoreUtils.getRecordClass(expectedClazz); + Map<Query<T>, Integer> result = remove(recordClass, queries); + return result.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getPartial(), e -> e.getValue() == 1)); + } + + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { + Map<Query<T>, Integer> result = new HashMap<>(); + for (Query<T> query : queries) { + result.put(query, remove(clazz, query)); + } + return result; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 19a23cb0225a..0e72cf417565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -25,7 +25,11 @@ import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -284,38 +288,47 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { + public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, + List<Query<T>> queries) throws IOException { verifyDriverReady(); - if (query == null) { - return 0; + // Track how many entries are deleted by each query + Map<Query<T>, Integer> ret = new HashMap<>(); + final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>()); + if (queries.isEmpty()) { + return ret; } // Read the current data long start = monotonicNow(); - List<T> records = null; + List<T> records; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); getMetrics().addFailure(monotonicNow() - start); - return 0; + return ret; } // Check the records to remove String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); + Set<T> recordsToRemove = new HashSet<>(); + Map<Query<T>, List<T>> queryToRecords = new HashMap<>(); + for (Query<T> query : queries) { + List<T> filtered = filterMultiple(query, records); + queryToRecords.put(query, filtered); + recordsToRemove.addAll(filtered); + } // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { + List<Callable<Void>> callables = new ArrayList<>(); + recordsToRemove.forEach(existingRecord -> callables.add(() -> { LOG.info("Removing \"{}\"", existingRecord); try { String primaryKey = getPrimaryKey(existingRecord); String path = getNodePath(znode, primaryKey); if (zkManager.delete(path)) { - removed++; + trueRemoved.add(existingRecord); } else { LOG.error("Did not remove \"{}\"", existingRecord); } @@ -323,12 +336,38 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { LOG.error("Cannot remove \"{}\"", existingRecord, e); getMetrics().addFailure(monotonicNow() - start); } + return null; + })); + try { + if (enableConcurrent) { + executorService.invokeAll(callables); + } else { + for (Callable<Void> callable : callables) { + callable.call(); + } + } + } catch (Exception e) { + LOG.error("Record removal failed : {}", e.getMessage(), e); } long end = monotonicNow(); - if (removed > 0) { + if (!trueRemoved.isEmpty()) { getMetrics().addRemove(end - start); } - return removed; + // Generate return map + for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) { + for (T record : entry.getValue()) { + if (trueRemoved.contains(record)) { + ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1); + } + } + } + return ret; + } + + @Override + public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) + throws IOException { + return remove(clazz, Collections.singletonList(query)).get(query); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java index f94e415b4d51..5ddf93e05b52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java @@ -140,17 +140,28 @@ public class TestStateStoreZK extends TestStateStoreDriverBase { insertList.add(newRecord); } // Insert Multiple on sync mode - long startSync = Time.now(); + long startSyncPut = Time.now(); stateStoreDriver.putAll(insertList, true, false); - long endSync = Time.now(); + long endSyncPut = Time.now(); + // Removing 1000 records synchronously is painfully slow so test with only 5 records + // Then remove the rest with removeAll() + long startSyncRemove = Time.now(); + for (MountTable entry : insertList.subList(0, 5)) { + stateStoreDriver.remove(entry); + } + long endSyncRemove = Time.now(); stateStoreDriver.removeAll(MembershipState.class); stateStoreDriver.setEnableConcurrent(true); // Insert Multiple on async mode - long startAsync = Time.now(); + long startAsyncPut = Time.now(); stateStoreDriver.putAll(insertList, true, false); - long endAsync = Time.now(); - assertTrue((endSync - startSync) > (endAsync - startAsync)); + long endAsyncPut = Time.now(); + long startAsyncRemove = Time.now(); + stateStoreDriver.removeMultiple(insertList.subList(0, 5)); + long endAsyncRemove = Time.now(); + assertTrue((endSyncPut - startSyncPut) > (endAsyncPut - startAsyncPut)); + assertTrue((endSyncRemove - startSyncRemove) > (endAsyncRemove - startAsyncRemove)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org