[ 
https://issues.apache.org/jira/browse/HDFS-16967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708106#comment-17708106
 ] 

ASF GitHub Bot commented on HDFS-16967:
---------------------------------------

simbadzina commented on code in PR #5523:
URL: https://github.com/apache/hadoop/pull/5523#discussion_r1156417874


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -227,7 +270,23 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     if (metrics != null) {
       metrics.addRead(monotonicNow() - start);
     }
-    return new QueryResult<T>(ret, getTime());
+    return new QueryResult<>(result, getTime());
+  }
+
+  private <T extends BaseRecord> Void 
getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,

Review Comment:
   Can you add documentation to this function indication that the results list 
is being modified to collect the results.
   Changing the function name would make that clearer too.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -345,36 +397,71 @@ public <T extends BaseRecord> boolean putAll(
     }
 
     // Write the records
-    boolean success = true;
-    for (Entry<String, T> entry : toWrite.entrySet()) {
-      String recordPath = entry.getKey();
-      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
-      boolean recordWrittenSuccessfully = true;
-      try (BufferedWriter writer = getWriter(recordPathTemp)) {
-        T record = entry.getValue();
-        String line = serializeString(record);
-        writer.write(line);
-      } catch (IOException e) {
-        LOG.error("Cannot write {}", recordPathTemp, e);
-        recordWrittenSuccessfully = false;
-        success = false;
+    final AtomicBoolean success = new AtomicBoolean(true);
+    final List<Callable<Void>> callables = new ArrayList<>();
+    toWrite.entrySet().forEach(entry -> callables.add(() -> 
writeRecordToFile(success, entry)));
+    if (this.concurrentStoreAccessPool != null) {
+      // Write records concurrently
+      List<Future<Void>> futures = null;
+      try {
+        futures = this.concurrentStoreAccessPool.invokeAll(callables);
+      } catch (InterruptedException e) {
+        success.set(false);
+        LOG.error("Failed to put record concurrently.", e);
       }
-      // Commit
-      if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
-        LOG.error("Failed committing record into {}", recordPath);
-        success = false;
+      if (futures != null) {
+        for (Future<Void> future : futures) {
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            success.set(false);
+            LOG.error("Failed to retrieve results from concurrent record put 
runs.", e);
+          }
+        }
       }
+    } else {
+      // Write records serially
+      callables.forEach(callable -> {
+        try {
+          callable.call();
+        } catch (Exception e) {
+          success.set(false);
+          LOG.error("Failed to put record.", e);
+        }
+      });
     }
 
     long end = monotonicNow();
     if (metrics != null) {
-      if (success) {
+      if (success.get()) {
         metrics.addWrite(end - start);
       } else {
         metrics.addFailure(end - start);
       }
     }
-    return success;
+    return success.get();
+  }
+
+  private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,

Review Comment:
   Similar comment as above. Can we add documentation indicating `success` is 
being modified.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -137,6 +149,8 @@ public abstract <T extends BaseRecord> BufferedWriter 
getWriter(
    */
   protected abstract String getRootDir();
 
+  protected abstract int getConcurrentFilesAccessNumThreads();

Review Comment:
   Can we provide an implementation here and then just have one set of configs 
for the following two
   ```
   FEDERATION_STORE_PREFIX + "driver.file.async.threads";
   FEDERATION_STORE_PREFIX + "driver.fs.async.threads";
   ```
   I'm okay with keeping them separate though if you have prefer that.





> RBF: File based state stores should allow concurrent access to the records
> --------------------------------------------------------------------------
>
>                 Key: HDFS-16967
>                 URL: https://issues.apache.org/jira/browse/HDFS-16967
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>            Reporter: Viraj Jasani
>            Assignee: Viraj Jasani
>            Priority: Major
>              Labels: pull-request-available
>
> File based state store implementations (StateStoreFileImpl and 
> StateStoreFileSystemImpl) should allow updating as well as reading of the 
> state store records concurrently rather than serially. Concurrent access to 
> the record files on the hdfs based store seems to be improving the state 
> store cache loading performance by more than 10x.
> For instance, in order to maintain data integrity, when any mount table 
> record(s) is updated, the cache is reloaded. This reload operation seems to 
> be able to gain significant performance improvement by the concurrent access 
> of the mount table records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to