This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 58572aaa73 HDDS-10020. DoubleBufferEntry should not be generic. (#5878)
58572aaa73 is described below

commit 58572aaa736bb4c9ef859020c2a4726c7f1de97a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Dec 29 11:27:46 2023 -0800

    HDDS-10020. DoubleBufferEntry should not be generic. (#5878)
---
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   | 69 +++++++++++-----------
 .../ozone/om/ratis/helpers/DoubleBufferEntry.java  | 45 --------------
 2 files changed, 36 insertions(+), 78 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 678fa492ef..f20b5dad15 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +45,6 @@ import 
org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
-import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
 import 
org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -77,6 +75,25 @@ public final class OzoneManagerDoubleBuffer {
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
 
+  /** Entry for {@link #currentBuffer} and {@link #readyBuffer}. */
+  private static class Entry {
+    private final TermIndex termIndex;
+    private final OMClientResponse response;
+
+    Entry(TermIndex termIndex, OMClientResponse response) {
+      this.termIndex = termIndex;
+      this.response = response;
+    }
+
+    TermIndex getTermIndex() {
+      return termIndex;
+    }
+
+    OMClientResponse getResponse() {
+      return response;
+    }
+  }
+
   // Taken unbounded queue, if sync thread is taking too long time, we
   // might end up taking huge memory to add entries to the buffer.
   // TODO: We can avoid this using unbounded queue and use queue with
@@ -84,8 +101,8 @@ public final class OzoneManagerDoubleBuffer {
   // add entries. But in this also we might block rpc handlers, as we
   // clear entries after sync. Or we can come up with a good approach to
   // solve this.
-  private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
-  private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
+  private Queue<Entry> currentBuffer;
+  private Queue<Entry> readyBuffer;
 
 
   // future objects which hold the future returned by add method.
@@ -110,7 +127,6 @@ public final class OzoneManagerDoubleBuffer {
   private final boolean isTracingEnabled;
   private final Semaphore unFlushedTransactions;
   private final FlushNotifier flushNotifier;
-  private final String threadPrefix;
   private final S3SecretManager s3SecretManager;
 
   /**
@@ -153,7 +169,7 @@ public final class OzoneManagerDoubleBuffer {
       return this;
     }
 
-    public Builder setFlushNotifier(FlushNotifier flushNotifier) {
+    Builder setFlushNotifier(FlushNotifier flushNotifier) {
       this.flushNotifier = flushNotifier;
       return this;
     }
@@ -207,7 +223,6 @@ public final class OzoneManagerDoubleBuffer {
     this.ozoneManagerDoubleBufferMetrics =
         OzoneManagerDoubleBufferMetrics.create();
     this.flushNotifier = flushNotifier;
-    this.threadPrefix = threadPrefix;
     isRunning.set(true);
     // Daemon thread which runs in background and flushes transactions to DB.
     daemon = new Daemon(this::flushTransactions);
@@ -313,10 +328,8 @@ public final class OzoneManagerDoubleBuffer {
       // Flush #3: [request3]
       // Flush #4: [snapshotRequest2]
       // Flush #5: [request4]
-      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
-          splitReadyBufferAtCreateSnapshot();
-
-      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+      final List<Queue<Entry>> bufferQueues = 
splitReadyBufferAtCreateSnapshot();
+      for (Queue<Entry> buffer : bufferQueues) {
         flushBatch(buffer);
       }
 
@@ -329,13 +342,11 @@ public final class OzoneManagerDoubleBuffer {
     }
   }
 
-  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
-      throws IOException {
-
+  private void flushBatch(Queue<Entry> buffer) throws IOException {
     Map<String, List<Long>> cleanupEpochs = new HashMap<>();
     // Commit transaction info to DB.
     final List<TermIndex> flushedTransactions = buffer.stream()
-        .map(DoubleBufferEntry::getTermIndex)
+        .map(Entry::getTermIndex)
         .sorted()
         .collect(Collectors.toList());
     final List<Long> flushedEpochs = flushedTransactions.stream()
@@ -395,10 +406,9 @@ public final class OzoneManagerDoubleBuffer {
     updateMetrics(flushedTransactionsSize);
   }
 
-  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
-                            BatchOperation batchOperation) {
+  private String addToBatch(Queue<Entry> buffer, BatchOperation 
batchOperation) {
     String lastTraceId = null;
-    for (DoubleBufferEntry<OMClientResponse> entry: buffer) {
+    for (Entry entry: buffer) {
       OMClientResponse response = entry.getResponse();
       OMResponse omResponse = response.getOMResponse();
       lastTraceId = omResponse.getTraceID();
@@ -421,7 +431,7 @@ public final class OzoneManagerDoubleBuffer {
   /**
    * Splits the readyBuffer around the create snapshot request.
    * Returns, the list of queue split by create snapshot requests.
-   *
+   * <p>
    * CreateSnapshot is used as barrier because the checkpoint creation happens
    * in RocksDB callback flush. If multiple operations are flushed in one
    * specific batch, we are not sure at the flush of which specific operation
@@ -429,23 +439,17 @@ public final class OzoneManagerDoubleBuffer {
    * There could be a possibility of race condition that is exposed to rocksDB
    * behaviour for the batch.
    * Hence, we treat createSnapshot as separate batch flush.
-   *
+   * <p>
    * e.g. requestBuffer = [request1, request2, snapshotRequest1,
    * request3, snapshotRequest2, request4]
    * response = [[request1, request2], [snapshotRequest1], [request3],
    * [snapshotRequest2], [request4]]
    */
-  private List<Queue<DoubleBufferEntry<OMClientResponse>>>
-      splitReadyBufferAtCreateSnapshot() {
-    List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
-        new ArrayList<>();
-
-    Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
-        readyBuffer.iterator();
+  private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
+    final List<Queue<Entry>> response = new ArrayList<>();
 
     OMResponse previousOmResponse = null;
-    while (iterator.hasNext()) {
-      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+    for (final Entry entry : readyBuffer) {
       OMResponse omResponse = entry.getResponse().getOMResponse();
       // New queue gets created in three conditions:
       // 1. It is first element in the response,
@@ -464,8 +468,7 @@ public final class OzoneManagerDoubleBuffer {
     return response;
   }
 
-  private void addCleanupEntry(DoubleBufferEntry entry, Map<String,
-      List<Long>> cleanupEpochs) {
+  private void addCleanupEntry(Entry entry, Map<String, List<Long>> 
cleanupEpochs) {
     Class<? extends OMClientResponse> responseClass =
         entry.getResponse().getClass();
     CleanupTableInfo cleanupTableInfo =
@@ -598,7 +601,7 @@ public final class OzoneManagerDoubleBuffer {
    * Add OmResponseBufferEntry to buffer.
    */
   public synchronized CompletableFuture<Void> add(OMClientResponse response, 
TermIndex termIndex) {
-    currentBuffer.add(new DoubleBufferEntry<>(termIndex, response));
+    currentBuffer.add(new Entry(termIndex, response));
     notify();
 
     if (!isRatisEnabled) {
@@ -648,7 +651,7 @@ public final class OzoneManagerDoubleBuffer {
    * used by sync thread to flush transactions to DB.
    */
   private synchronized void swapCurrentAndReadyBuffer() {
-    Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
+    final Queue<Entry> temp = currentBuffer;
     currentBuffer = readyBuffer;
     readyBuffer = temp;
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
deleted file mode 100644
index f1b06d7f97..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.ratis.helpers;
-
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.ratis.server.protocol.TermIndex;
-
-/**
- * Entry in OzoneManagerDouble Buffer.
- * @param <Response>
- */
-public class DoubleBufferEntry<Response extends OMClientResponse> {
-
-  private final TermIndex termIndex;
-  private Response response;
-
-  public DoubleBufferEntry(TermIndex termIndex, Response response) {
-    this.termIndex = termIndex;
-    this.response = response;
-  }
-
-  public TermIndex getTermIndex() {
-    return termIndex;
-  }
-
-  public Response getResponse() {
-    return response;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to