[
https://issues.apache.org/jira/browse/HDFS-16967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708118#comment-17708118
]
ASF GitHub Bot commented on HDFS-16967:
---------------------------------------
virajjasani commented on code in PR #5523:
URL: https://github.com/apache/hadoop/pull/5523#discussion_r1156450948
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -227,7 +270,23 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
if (metrics != null) {
metrics.addRead(monotonicNow() - start);
}
- return new QueryResult<T>(ret, getTime());
+ return new QueryResult<>(result, getTime());
+ }
+
+ private <T extends BaseRecord> Void
getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,
Review Comment:
Done, thanks
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -345,36 +397,71 @@ public <T extends BaseRecord> boolean putAll(
}
// Write the records
- boolean success = true;
- for (Entry<String, T> entry : toWrite.entrySet()) {
- String recordPath = entry.getKey();
- String recordPathTemp = recordPath + "." + now() + TMP_MARK;
- boolean recordWrittenSuccessfully = true;
- try (BufferedWriter writer = getWriter(recordPathTemp)) {
- T record = entry.getValue();
- String line = serializeString(record);
- writer.write(line);
- } catch (IOException e) {
- LOG.error("Cannot write {}", recordPathTemp, e);
- recordWrittenSuccessfully = false;
- success = false;
+ final AtomicBoolean success = new AtomicBoolean(true);
+ final List<Callable<Void>> callables = new ArrayList<>();
+ toWrite.entrySet().forEach(entry -> callables.add(() ->
writeRecordToFile(success, entry)));
+ if (this.concurrentStoreAccessPool != null) {
+ // Write records concurrently
+ List<Future<Void>> futures = null;
+ try {
+ futures = this.concurrentStoreAccessPool.invokeAll(callables);
+ } catch (InterruptedException e) {
+ success.set(false);
+ LOG.error("Failed to put record concurrently.", e);
}
- // Commit
- if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
- LOG.error("Failed committing record into {}", recordPath);
- success = false;
+ if (futures != null) {
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ success.set(false);
+ LOG.error("Failed to retrieve results from concurrent record put
runs.", e);
+ }
+ }
}
+ } else {
+ // Write records serially
+ callables.forEach(callable -> {
+ try {
+ callable.call();
+ } catch (Exception e) {
+ success.set(false);
+ LOG.error("Failed to put record.", e);
+ }
+ });
}
long end = monotonicNow();
if (metrics != null) {
- if (success) {
+ if (success.get()) {
metrics.addWrite(end - start);
} else {
metrics.addFailure(end - start);
}
}
- return success;
+ return success.get();
+ }
+
+ private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
Review Comment:
Done
> RBF: File based state stores should allow concurrent access to the records
> --------------------------------------------------------------------------
>
> Key: HDFS-16967
> URL: https://issues.apache.org/jira/browse/HDFS-16967
> Project: Hadoop HDFS
> Issue Type: Improvement
> Reporter: Viraj Jasani
> Assignee: Viraj Jasani
> Priority: Major
> Labels: pull-request-available
>
> File based state store implementations (StateStoreFileImpl and
> StateStoreFileSystemImpl) should allow updating as well as reading of the
> state store records concurrently rather than serially. Concurrent access to
> the record files on the hdfs based store seems to be improving the state
> store cache loading performance by more than 10x.
> For instance, in order to maintain data integrity, when any mount table
> record(s) is updated, the cache is reloaded. This reload operation seems to
> be able to gain significant performance improvement by the concurrent access
> of the mount table records.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]