This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6169dc7e9a HDDS-10022. Remove current/readyFutureQueue from
OzoneManagerDoubleBuffer. (#5881)
6169dc7e9a is described below
commit 6169dc7e9a2a786ec5bddf857667cd71da89618a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Dec 30 08:41:22 2023 -0800
HDDS-10022. Remove current/readyFutureQueue from OzoneManagerDoubleBuffer.
(#5881)
---
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 46 +++-------------------
.../ozone/om/ratis/OzoneManagerStateMachine.java | 3 +-
.../hadoop/ozone/om/response/OMClientResponse.java | 24 +++--------
.../protocolPB/OzoneManagerRequestHandler.java | 3 +-
4 files changed, 14 insertions(+), 62 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index f20b5dad15..1b4e3cafc4 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -104,15 +104,6 @@ public final class OzoneManagerDoubleBuffer {
private Queue<Entry> currentBuffer;
private Queue<Entry> readyBuffer;
-
- // future objects which hold the future returned by add method.
- private volatile Queue<CompletableFuture<Void>> currentFutureQueue;
-
- // Once we have an entry in current buffer, we swap the currentFutureQueue
- // with readyFutureQueue. After flush is completed in flushTransaction
- // daemon thread, we complete the futures in readyFutureQueue and clear them.
- private volatile Queue<CompletableFuture<Void>> readyFutureQueue;
-
private final Daemon daemon;
private final OMMetadataManager omMetadataManager;
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
@@ -213,10 +204,6 @@ public final class OzoneManagerDoubleBuffer {
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.isRatisEnabled = isRatisEnabled;
this.isTracingEnabled = isTracingEnabled;
- if (!isRatisEnabled) {
- this.currentFutureQueue = new ConcurrentLinkedQueue<>();
- this.readyFutureQueue = new ConcurrentLinkedQueue<>();
- }
this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
@@ -380,7 +367,10 @@ public final class OzoneManagerDoubleBuffer {
// Complete futures first and then do other things.
// So that handler threads will be released.
if (!isRatisEnabled) {
- clearReadyFutureQueue(buffer.size());
+ buffer.stream()
+ .map(Entry::getResponse)
+ .map(OMClientResponse::getFlushFuture)
+ .forEach(f -> f.complete(null));
}
int flushedTransactionsSize = buffer.size();
@@ -495,17 +485,6 @@ public final class OzoneManagerDoubleBuffer {
}
}
- /**
- * Completes futures for first count element form the readyFutureQueue
- * so that handler thread can be released asap.
- */
- private void clearReadyFutureQueue(int count) {
- while (!readyFutureQueue.isEmpty() && count > 0) {
- readyFutureQueue.remove().complete(null);
- count--;
- }
- }
-
private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
cleanupEpochs.forEach((tableName, epochs) -> {
Collections.sort(epochs);
@@ -600,18 +579,12 @@ public final class OzoneManagerDoubleBuffer {
/**
* Add OmResponseBufferEntry to buffer.
*/
- public synchronized CompletableFuture<Void> add(OMClientResponse response,
TermIndex termIndex) {
+ public synchronized void add(OMClientResponse response, TermIndex termIndex)
{
currentBuffer.add(new Entry(termIndex, response));
notify();
if (!isRatisEnabled) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- currentFutureQueue.add(future);
- return future;
- } else {
- // In Non-HA case we don't need future to be returned, and this return
- // status is not used.
- return null;
+ response.setFlushFuture(new CompletableFuture<>());
}
}
@@ -654,13 +627,6 @@ public final class OzoneManagerDoubleBuffer {
final Queue<Entry> temp = currentBuffer;
currentBuffer = readyBuffer;
readyBuffer = temp;
-
- if (!isRatisEnabled) {
- // Swap future queue.
- Queue<CompletableFuture<Void>> tempFuture = currentFutureQueue;
- currentFutureQueue = readyFutureQueue;
- readyFutureQueue = tempFuture;
- }
}
@VisibleForTesting
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index ee01ecedb8..a25261ded9 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -610,8 +610,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
OMResponse omResponse = omResponseBuilder.build();
OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse);
- omClientResponse.setFlushFuture(
- ozoneManagerDoubleBuffer.add(omClientResponse, termIndex));
+ ozoneManagerDoubleBuffer.add(omClientResponse, termIndex);
return omResponse;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
index 2a8af15b6b..6e300f2e03 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
@@ -25,25 +25,20 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Interface for OM Responses, each OM response should implement this
interface.
*/
public abstract class OMClientResponse {
- private OMResponse omResponse;
+ private final OMResponse omResponse;
+ /** Used only for non-Ratis. */
private CompletableFuture<Void> flushFuture = null;
private OMLockDetails omLockDetails;
- private static final Logger LOG =
- LoggerFactory.getLogger(OMClientResponse.class);
-
public OMClientResponse(OMResponse omResponse) {
Preconditions.checkNotNull(omResponse);
this.omResponse = omResponse;
@@ -57,21 +52,17 @@ public abstract class OMClientResponse {
* For error case, check that the status of omResponse is not OK.
*/
public void checkStatusNotOK() {
- Preconditions.checkArgument(!omResponse.getStatus().equals(
- OzoneManagerProtocolProtos.Status.OK));
+ Preconditions.checkArgument(!omResponse.getStatus().equals(Status.OK));
}
/**
* Check if omResponse status is OK. If yes, add to DB.
* For OmResponse with failure, this should do nothing. This method is not
* called in failure scenario in OM code.
- * @param omMetadataManager
- * @param batchOperation
- * @throws IOException
*/
public void checkAndUpdateDB(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
- if (omResponse.getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+ if (omResponse.getStatus() == Status.OK) {
addToDBBatch(omMetadataManager, batchOperation);
}
}
@@ -79,9 +70,6 @@ public abstract class OMClientResponse {
/**
* Implement logic to add the response to batch. This function should be
* called from checkAndUpdateDB only.
- * @param omMetadataManager
- * @param batchOperation
- * @throws IOException
*/
protected abstract void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 818f61c19a..d96637b461 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -404,8 +404,7 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
Preconditions.checkNotNull(omClientResponse,
"omClientResponse returned by validateAndUpdateCache cannot be
null");
if (omRequest.getCmdType() != Type.Prepare) {
- omClientResponse.setFlushFuture(
- ozoneManagerDoubleBuffer.add(omClientResponse, termIndex));
+ ozoneManagerDoubleBuffer.add(omClientResponse, termIndex);
}
return omClientResponse;
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]