This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 58572aaa73 HDDS-10020. DoubleBufferEntry should not be generic. (#5878)
58572aaa73 is described below
commit 58572aaa736bb4c9ef859020c2a4726c7f1de97a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Dec 29 11:27:46 2023 -0800
HDDS-10020. DoubleBufferEntry should not be generic. (#5878)
---
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 69 +++++++++++-----------
.../ozone/om/ratis/helpers/DoubleBufferEntry.java | 45 --------------
2 files changed, 36 insertions(+), 78 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 678fa492ef..f20b5dad15 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -46,7 +45,6 @@ import
org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
-import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -77,6 +75,25 @@ public final class OzoneManagerDoubleBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
+ /** Entry for {@link #currentBuffer} and {@link #readyBuffer}. */
+ private static class Entry {
+ private final TermIndex termIndex;
+ private final OMClientResponse response;
+
+ Entry(TermIndex termIndex, OMClientResponse response) {
+ this.termIndex = termIndex;
+ this.response = response;
+ }
+
+ TermIndex getTermIndex() {
+ return termIndex;
+ }
+
+ OMClientResponse getResponse() {
+ return response;
+ }
+ }
+
// Taken unbounded queue, if sync thread is taking too long time, we
// might end up taking huge memory to add entries to the buffer.
// TODO: We can avoid this using unbounded queue and use queue with
@@ -84,8 +101,8 @@ public final class OzoneManagerDoubleBuffer {
// add entries. But in this also we might block rpc handlers, as we
// clear entries after sync. Or we can come up with a good approach to
// solve this.
- private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
- private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
+ private Queue<Entry> currentBuffer;
+ private Queue<Entry> readyBuffer;
// future objects which hold the future returned by add method.
@@ -110,7 +127,6 @@ public final class OzoneManagerDoubleBuffer {
private final boolean isTracingEnabled;
private final Semaphore unFlushedTransactions;
private final FlushNotifier flushNotifier;
- private final String threadPrefix;
private final S3SecretManager s3SecretManager;
/**
@@ -153,7 +169,7 @@ public final class OzoneManagerDoubleBuffer {
return this;
}
- public Builder setFlushNotifier(FlushNotifier flushNotifier) {
+ Builder setFlushNotifier(FlushNotifier flushNotifier) {
this.flushNotifier = flushNotifier;
return this;
}
@@ -207,7 +223,6 @@ public final class OzoneManagerDoubleBuffer {
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
this.flushNotifier = flushNotifier;
- this.threadPrefix = threadPrefix;
isRunning.set(true);
// Daemon thread which runs in background and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
@@ -313,10 +328,8 @@ public final class OzoneManagerDoubleBuffer {
// Flush #3: [request3]
// Flush #4: [snapshotRequest2]
// Flush #5: [request4]
- List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
- splitReadyBufferAtCreateSnapshot();
-
- for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+ final List<Queue<Entry>> bufferQueues =
splitReadyBufferAtCreateSnapshot();
+ for (Queue<Entry> buffer : bufferQueues) {
flushBatch(buffer);
}
@@ -329,13 +342,11 @@ public final class OzoneManagerDoubleBuffer {
}
}
- private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
- throws IOException {
-
+ private void flushBatch(Queue<Entry> buffer) throws IOException {
Map<String, List<Long>> cleanupEpochs = new HashMap<>();
// Commit transaction info to DB.
final List<TermIndex> flushedTransactions = buffer.stream()
- .map(DoubleBufferEntry::getTermIndex)
+ .map(Entry::getTermIndex)
.sorted()
.collect(Collectors.toList());
final List<Long> flushedEpochs = flushedTransactions.stream()
@@ -395,10 +406,9 @@ public final class OzoneManagerDoubleBuffer {
updateMetrics(flushedTransactionsSize);
}
- private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
- BatchOperation batchOperation) {
+ private String addToBatch(Queue<Entry> buffer, BatchOperation
batchOperation) {
String lastTraceId = null;
- for (DoubleBufferEntry<OMClientResponse> entry: buffer) {
+ for (Entry entry: buffer) {
OMClientResponse response = entry.getResponse();
OMResponse omResponse = response.getOMResponse();
lastTraceId = omResponse.getTraceID();
@@ -421,7 +431,7 @@ public final class OzoneManagerDoubleBuffer {
/**
* Splits the readyBuffer around the create snapshot request.
* Returns, the list of queue split by create snapshot requests.
- *
+ * <p>
* CreateSnapshot is used as barrier because the checkpoint creation happens
* in RocksDB callback flush. If multiple operations are flushed in one
* specific batch, we are not sure at the flush of which specific operation
@@ -429,23 +439,17 @@ public final class OzoneManagerDoubleBuffer {
* There could be a possibility of race condition that is exposed to rocksDB
* behaviour for the batch.
* Hence, we treat createSnapshot as separate batch flush.
- *
+ * <p>
* e.g. requestBuffer = [request1, request2, snapshotRequest1,
* request3, snapshotRequest2, request4]
* response = [[request1, request2], [snapshotRequest1], [request3],
* [snapshotRequest2], [request4]]
*/
- private List<Queue<DoubleBufferEntry<OMClientResponse>>>
- splitReadyBufferAtCreateSnapshot() {
- List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
- new ArrayList<>();
-
- Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
- readyBuffer.iterator();
+ private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
+ final List<Queue<Entry>> response = new ArrayList<>();
OMResponse previousOmResponse = null;
- while (iterator.hasNext()) {
- DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+ for (final Entry entry : readyBuffer) {
OMResponse omResponse = entry.getResponse().getOMResponse();
// New queue gets created in three conditions:
// 1. It is first element in the response,
@@ -464,8 +468,7 @@ public final class OzoneManagerDoubleBuffer {
return response;
}
- private void addCleanupEntry(DoubleBufferEntry entry, Map<String,
- List<Long>> cleanupEpochs) {
+ private void addCleanupEntry(Entry entry, Map<String, List<Long>>
cleanupEpochs) {
Class<? extends OMClientResponse> responseClass =
entry.getResponse().getClass();
CleanupTableInfo cleanupTableInfo =
@@ -598,7 +601,7 @@ public final class OzoneManagerDoubleBuffer {
* Add OmResponseBufferEntry to buffer.
*/
public synchronized CompletableFuture<Void> add(OMClientResponse response,
TermIndex termIndex) {
- currentBuffer.add(new DoubleBufferEntry<>(termIndex, response));
+ currentBuffer.add(new Entry(termIndex, response));
notify();
if (!isRatisEnabled) {
@@ -648,7 +651,7 @@ public final class OzoneManagerDoubleBuffer {
* used by sync thread to flush transactions to DB.
*/
private synchronized void swapCurrentAndReadyBuffer() {
- Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
+ final Queue<Entry> temp = currentBuffer;
currentBuffer = readyBuffer;
readyBuffer = temp;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
deleted file mode 100644
index f1b06d7f97..0000000000
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.ratis.helpers;
-
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.ratis.server.protocol.TermIndex;
-
-/**
- * Entry in OzoneManagerDouble Buffer.
- * @param <Response>
- */
-public class DoubleBufferEntry<Response extends OMClientResponse> {
-
- private final TermIndex termIndex;
- private Response response;
-
- public DoubleBufferEntry(TermIndex termIndex, Response response) {
- this.termIndex = termIndex;
- this.response = response;
- }
-
- public TermIndex getTermIndex() {
- return termIndex;
- }
-
- public Response getResponse() {
- return response;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]