simbadzina commented on code in PR #5523:
URL: https://github.com/apache/hadoop/pull/5523#discussion_r1156417874
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -227,7 +270,23 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
if (metrics != null) {
metrics.addRead(monotonicNow() - start);
}
- return new QueryResult<T>(ret, getTime());
+ return new QueryResult<>(result, getTime());
+ }
+
+ private <T extends BaseRecord> Void
getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,
Review Comment:
Can you add documentation to this function indication that the results list
is being modified to collect the results.
Changing the function name would make that clearer too.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -345,36 +397,71 @@ public <T extends BaseRecord> boolean putAll(
}
// Write the records
- boolean success = true;
- for (Entry<String, T> entry : toWrite.entrySet()) {
- String recordPath = entry.getKey();
- String recordPathTemp = recordPath + "." + now() + TMP_MARK;
- boolean recordWrittenSuccessfully = true;
- try (BufferedWriter writer = getWriter(recordPathTemp)) {
- T record = entry.getValue();
- String line = serializeString(record);
- writer.write(line);
- } catch (IOException e) {
- LOG.error("Cannot write {}", recordPathTemp, e);
- recordWrittenSuccessfully = false;
- success = false;
+ final AtomicBoolean success = new AtomicBoolean(true);
+ final List<Callable<Void>> callables = new ArrayList<>();
+ toWrite.entrySet().forEach(entry -> callables.add(() ->
writeRecordToFile(success, entry)));
+ if (this.concurrentStoreAccessPool != null) {
+ // Write records concurrently
+ List<Future<Void>> futures = null;
+ try {
+ futures = this.concurrentStoreAccessPool.invokeAll(callables);
+ } catch (InterruptedException e) {
+ success.set(false);
+ LOG.error("Failed to put record concurrently.", e);
}
- // Commit
- if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
- LOG.error("Failed committing record into {}", recordPath);
- success = false;
+ if (futures != null) {
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ success.set(false);
+ LOG.error("Failed to retrieve results from concurrent record put
runs.", e);
+ }
+ }
}
+ } else {
+ // Write records serially
+ callables.forEach(callable -> {
+ try {
+ callable.call();
+ } catch (Exception e) {
+ success.set(false);
+ LOG.error("Failed to put record.", e);
+ }
+ });
}
long end = monotonicNow();
if (metrics != null) {
- if (success) {
+ if (success.get()) {
metrics.addWrite(end - start);
} else {
metrics.addFailure(end - start);
}
}
- return success;
+ return success.get();
+ }
+
+ private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
Review Comment:
Similar comment as above. Can we add documentation indicating `success` is
being modified.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -137,6 +149,8 @@ public abstract <T extends BaseRecord> BufferedWriter
getWriter(
*/
protected abstract String getRootDir();
+ protected abstract int getConcurrentFilesAccessNumThreads();
Review Comment:
Can we provide an implementation here and then just have one set of configs
for the following two
```
FEDERATION_STORE_PREFIX + "driver.file.async.threads";
FEDERATION_STORE_PREFIX + "driver.fs.async.threads";
```
I'm okay with keeping them separate though if you have prefer that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]