This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 72bce3d792 HDDS-8508. SCMHATransactionBuffer flush based on time 
(#4683)
72bce3d792 is described below

commit 72bce3d79209ea700b93005da533053172ce3e85
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu May 25 21:41:13 2023 +0530

    HDDS-8508. SCMHATransactionBuffer flush based on time (#4683)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  | 18 ++++++
 .../common/src/main/resources/ozone-default.xml    | 12 ++++
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |  3 +
 .../hdds/scm/ha/SCMHADBTransactionBuffer.java      |  5 ++
 .../hdds/scm/ha/SCMHADBTransactionBufferImpl.java  | 37 ++++++++++--
 .../hdds/scm/ha/SCMHADBTransactionBufferStub.java  | 11 ++++
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       | 28 +++++++++
 .../hadoop/hdds/scm/ha/SCMHAManagerStub.java       |  5 ++
 .../scm/ha/SCMHATransactionBufferMonitorTask.java  | 68 ++++++++++++++++++++++
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  |  2 +
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     | 28 +++++++--
 .../hdds/scm/ha/TestReplicationAnnotation.java     |  5 ++
 .../hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java   |  3 +
 .../ozone/scm/TestStorageContainerManagerHA.java   | 23 +++++++-
 14 files changed, 236 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1ed7e0a806..f59cb1b22b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -560,11 +560,24 @@ public final class ScmConfigKeys {
           "ozone.scm.ha.ratis.log.purge.gap";
   public static final int OZONE_SCM_HA_RAFT_LOG_PURGE_GAP_DEFAULT = 1000000;
 
+  /**
+   * the config will transfer value to ratis config
+   * raft.server.snapshot.auto.trigger.threshold.
+   */
   public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD =
           "ozone.scm.ha.ratis.snapshot.threshold";
   public static final long OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD_DEFAULT =
           1000L;
 
+  /**
+   * the config will transfer value to ratis config
+   * raft.server.snapshot.creation.gap, used by ratis to take snapshot
+   * when manual trigger using api.
+   */
+  public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_GAP
+      = "ozone.scm.ha.ratis.server.snapshot.creation.gap";
+  public static final long OZONE_SCM_HA_RATIS_SNAPSHOT_GAP_DEFAULT =
+      1024L;
   public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_DIR =
           "ozone.scm.ha.ratis.snapshot.dir";
 
@@ -585,6 +598,11 @@ public final class ScmConfigKeys {
 
   public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_SCMAUDIT =
       "ozone.audit.log.debug.cmd.list.scmaudit";
+
+  public static final String OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL =
+      "ozone.scm.ha.dbtransactionbuffer.flush.interval";
+  public static final long
+      OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT = 600 * 1000L;
   /**
    * Never constructed.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 22671cfaa9..fa0dee1df6 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3435,6 +3435,18 @@
     <description>Wait duration before which close container
       is send to DN.</description>
   </property>
+  <property>
+    <name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
+    <value>1024</value>
+    <tag>SCM, OZONE</tag>
+    <description>Raft snapshot gap index after which snapshot can be 
taken.</description>
+  </property>
+  <property>
+    <name>ozone.scm.ha.dbtransactionbuffer.flush.interval</name>
+    <value>600s</value>
+    <tag>SCM, OZONE</tag>
+    <description>Wait duration for flush of buffered transaction.</description>
+  </property>
 
 
   <property>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index 92cf7a9423..67a9b6ad9a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -231,6 +231,9 @@ public final class RatisUtil {
     Snapshot.setAutoTriggerThreshold(properties,
         ozoneConf.getLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD,
                 ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD_DEFAULT));
+    Snapshot.setCreationGap(properties,
+        ozoneConf.getLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP,
+            ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP_DEFAULT));
   }
 
   public static void checkRatisException(IOException e, String port,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
index f3f9f97e8c..b6d57f3384 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * DB transaction that buffers SCM DB transactions. Call the flush method
@@ -38,7 +39,11 @@ public interface SCMHADBTransactionBuffer
 
   void setLatestSnapshot(SnapshotInfo latestSnapshot);
 
+  AtomicReference<SnapshotInfo> getLatestSnapshotRef();
+
   void flush() throws IOException;
+  
+  boolean shouldFlush(long snapshotWaitTime);
 
   void init() throws IOException;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index eb5dcd7c27..19990dc23c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
@@ -43,7 +45,10 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
   private SCMMetadataStore metadataStore;
   private BatchOperation currentBatchOperation;
   private TransactionInfo latestTrxInfo;
-  private SnapshotInfo latestSnapshot;
+  private final AtomicReference<SnapshotInfo> latestSnapshot
+      = new AtomicReference<>();
+  private final AtomicLong txFlushPending = new AtomicLong(0);
+  private long lastSnapshotTimeMs = 0;
   private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
 
   public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
@@ -61,6 +66,7 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
       Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
     rwLock.readLock().lock();
     try {
+      txFlushPending.getAndIncrement();
       table.putWithBatch(getCurrentBatchOperation(), key, value);
     } finally {
       rwLock.readLock().unlock();
@@ -72,6 +78,7 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
       throws IOException {
     rwLock.readLock().lock();
     try {
+      txFlushPending.getAndIncrement();
       table.deleteWithBatch(getCurrentBatchOperation(), key);
     } finally {
       rwLock.readLock().unlock();
@@ -95,12 +102,17 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
 
   @Override
   public SnapshotInfo getLatestSnapshot() {
-    return latestSnapshot;
+    return latestSnapshot.get();
   }
 
   @Override
   public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
-    this.latestSnapshot = latestSnapshot;
+    this.latestSnapshot.set(latestSnapshot);
+  }
+
+  @Override
+  public AtomicReference<SnapshotInfo> getLatestSnapshotRef() {
+    return latestSnapshot;
   }
 
   @Override
@@ -115,7 +127,7 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
 
       metadataStore.getStore().commitBatchOperation(currentBatchOperation);
       currentBatchOperation.close();
-      this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+      this.latestSnapshot.set(latestTrxInfo.toSnapshotInfo());
       // reset batch operation
       currentBatchOperation = metadataStore.getStore().initBatchOperation();
 
@@ -125,6 +137,8 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
           deletedBlockLog instanceof DeletedBlockLogImpl);
       ((DeletedBlockLogImpl) deletedBlockLog).onFlush();
     } finally {
+      txFlushPending.set(0);
+      lastSnapshotTimeMs = scm.getSystemClock().millis();
       rwLock.writeLock().unlock();
     }
   }
@@ -151,11 +165,24 @@ public class SCMHADBTransactionBufferImpl implements 
SCMHADBTransactionBuffer {
                 .setCurrentTerm(0)
                 .build();
       }
-      latestSnapshot = latestTrxInfo.toSnapshotInfo();
+      latestSnapshot.set(latestTrxInfo.toSnapshotInfo());
     } finally {
+      txFlushPending.set(0);
+      lastSnapshotTimeMs = scm.getSystemClock().millis();
       rwLock.writeLock().unlock();
     }
   }
+  
+  @Override
+  public boolean shouldFlush(long snapshotWaitTime) {
+    rwLock.readLock().lock();
+    try {
+      long timeDiff = scm.getSystemClock().millis() - lastSnapshotTimeMs;
+      return txFlushPending.get() > 0 && timeDiff > snapshotWaitTime;
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
 
   @Override
   public String toString() {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
index 457c8c23f3..66d51c08c5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 // TODO: Move this class to test package after fixing Recon
@@ -95,6 +96,11 @@ public class SCMHADBTransactionBufferStub implements 
SCMHADBTransactionBuffer {
 
   }
 
+  @Override
+  public AtomicReference<SnapshotInfo> getLatestSnapshotRef() {
+    return null;
+  }
+
   @Override
   public void flush() throws IOException {
     rwLock.writeLock().lock();
@@ -111,6 +117,11 @@ public class SCMHADBTransactionBufferStub implements 
SCMHADBTransactionBuffer {
     }
   }
 
+  @Override
+  public boolean shouldFlush(long snapshotWaitTime) {
+    return true;
+  }
+
   @Override
   public void init() throws IOException {
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 6af0fb9919..57c47b4b9d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.ha;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
@@ -45,6 +46,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT;
+
 /**
  * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
  * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
@@ -67,6 +71,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
 
   // this should ideally be started only in a ratis leader
   private final InterSCMGrpcProtocolService grpcServer;
+  private BackgroundSCMService trxBufferMonitorService = null;
 
   /**
    * Creates SCMHAManager instance.
@@ -125,6 +130,26 @@ public class SCMHAManagerImpl implements SCMHAManager {
           ratisServer.getDivision().getGroup().getPeers());
     }
     grpcServer.start();
+    createStartTransactionBufferMonitor();
+  }
+
+  private void createStartTransactionBufferMonitor() {
+    long interval = conf.getTimeDuration(
+        OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+        OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    SCMHATransactionBufferMonitorTask monitorTask
+        = new SCMHATransactionBufferMonitorTask(
+        (SCMHADBTransactionBuffer) transactionBuffer, ratisServer, interval);
+    trxBufferMonitorService =
+        new BackgroundSCMService.Builder().setClock(scm.getSystemClock())
+            .setScmContext(scm.getScmContext())
+            .setServiceName("SCMHATransactionMonitor")
+            .setIntervalInMillis(interval)
+            .setWaitTimeInMillis(interval)
+            .setPeriodicalTask(monitorTask).build();
+    scm.getSCMServiceManager().register(trxBufferMonitorService);
+    trxBufferMonitorService.start();
   }
 
   public SCMRatisServer getRatisServer() {
@@ -342,6 +367,9 @@ public class SCMHAManagerImpl implements SCMHAManager {
       grpcServer.stop();
       close();
     }
+    if (trxBufferMonitorService != null) {
+      trxBufferMonitorService.stop();
+    }
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 6382df3483..7c926ce884 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -203,6 +203,11 @@ public final class SCMHAManagerStub implements 
SCMHAManager {
       return SCMRatisResponse.decode(reply);
     }
 
+    @Override
+    public boolean triggerSnapshot() throws IOException {
+      throw new IOException("submitSnapshotRequest is called.");
+    }
+
     private Message process(final SCMRatisRequest request) throws Exception {
       try {
         final Object handler = handlers.get(request.getType());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
new file mode 100644
index 0000000000..1eb7690290
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.  
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service running in SCM to check and flush the HA Transaction
+ * buffer.
+ */
+public class SCMHATransactionBufferMonitorTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class);
+  private final SCMRatisServer server;
+  private final SCMHADBTransactionBuffer transactionBuffer;
+  private final long flushInterval;
+
+  /**
+   * SCMService related variables.
+   */
+  public SCMHATransactionBufferMonitorTask(
+      SCMHADBTransactionBuffer transactionBuffer,
+      SCMRatisServer server, long flushInterval) {
+    this.flushInterval = flushInterval;
+    this.transactionBuffer = transactionBuffer;
+    this.server = server;
+  }
+
+  @Override
+  public void run() {
+    if (transactionBuffer.shouldFlush(flushInterval)) {
+      LOG.debug("Running TransactionFlushTask");
+      // set latest snapshot to null for force snapshot
+      // the value will be reset again when snapshot is taken
+      final SnapshotInfo lastSnapshot = transactionBuffer
+          .getLatestSnapshotRef().getAndSet(null);
+      try {
+        server.triggerSnapshot();
+      } catch (IOException e) {
+        LOG.error("Snapshot request is failed", e);
+      } finally {
+        // under failure case, if unable to take snapshot, its value
+        // is reset to previous known value
+        transactionBuffer.getLatestSnapshotRef().compareAndSet(
+            null, lastSnapshot);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index d5de48107e..a786bd2944 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -42,6 +42,8 @@ public interface SCMRatisServer {
       throws IOException, ExecutionException, InterruptedException,
       TimeoutException;
 
+  boolean triggerSnapshot() throws IOException;
+
   void stop() throws IOException;
 
   boolean isStopped();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 8360a8de37..fa4e12e0b4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -52,6 +52,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServer;
@@ -77,6 +78,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
   private final RaftServer.Division division;
   private final GrpcTlsConfig grpcTlsConfig;
   private boolean isStopped;
+  private final long requestTimeout;
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
@@ -84,6 +86,14 @@ public class SCMRatisServerImpl implements SCMRatisServer {
       final StorageContainerManager scm, final SCMHADBTransactionBuffer buffer)
       throws IOException {
     this.scm = scm;
+    
+    requestTimeout = ozoneConf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    Preconditions.checkArgument(requestTimeout > 1000L,
+        "Ratis request timeout cannot be less than 1000ms.");
+    
     final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
     LOG.info("starting Raft server for scm:{}", scm.getScmId());
     // During SCM startup, the bootstrapped node will be started just with
@@ -214,12 +224,6 @@ public class SCMRatisServerImpl implements SCMRatisServer {
         .setType(RaftClientRequest.writeRequestType())
         .build();
     // any request submitted to
-    final long requestTimeout = ozoneConf.getTimeDuration(
-                ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT,
-                ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT_DEFAULT,
-                TimeUnit.MILLISECONDS);
-    Preconditions.checkArgument(requestTimeout > 1000L,
-            "Ratis request timeout cannot be less than 1000ms.");
     final RaftClientReply raftClientReply =
         server.submitClientRequestAsync(raftClientRequest)
             .get(requestTimeout, TimeUnit.MILLISECONDS);
@@ -229,6 +233,18 @@ public class SCMRatisServerImpl implements SCMRatisServer {
     return SCMRatisResponse.decode(raftClientReply);
   }
 
+  @Override
+  public boolean triggerSnapshot() throws IOException {
+    final SnapshotManagementRequest req = SnapshotManagementRequest.newCreate(
+        clientId, getDivision().getId(), getDivision().getGroup().getGroupId(),
+        nextCallId(), requestTimeout);
+    final RaftClientReply raftClientReply = server.snapshotManagement(req);
+    if (!raftClientReply.isSuccess()) {
+      LOG.warn("Snapshot request failed", raftClientReply.getException());
+    }
+    return raftClientReply.isSuccess();
+  }
+
   private long nextCallId() {
     return callId.getAndIncrement() & Long.MAX_VALUE;
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index a1e7a30f01..24a09bdac2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -63,6 +63,11 @@ public class TestReplicationAnnotation {
         throw new IOException("submitRequest is called.");
       }
 
+      @Override
+      public boolean triggerSnapshot() throws IOException {
+        throw new IOException("submitSnapshotRequest is called.");
+      }
+
       @Override
       public void stop() throws IOException {
       }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
index 11209217cc..f1fd5590c0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import java.time.Clock;
+import java.time.ZoneOffset;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -253,6 +255,7 @@ class TestSCMHAManagerImpl {
     when(dbStore.initBatchOperation()).thenReturn(batchOperation);
     when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" +
         conf.get(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY));
+    when(scm.getSystemClock()).thenReturn(Clock.system(ZoneOffset.UTC));
 
     final SCMHAManager manager = new SCMHAManagerImpl(conf, scm);
     when(scm.getScmHAManager()).thenReturn(manager);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index 942bb6a575..ff2d9dae69 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.statemachine.SnapshotInfo;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -90,6 +91,9 @@ public class TestStorageContainerManagerHA {
   public void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+    conf.set(ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+        "5s");
+    conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, "1");
     clusterId = UUID.randomUUID().toString();
     scmId = UUID.randomUUID().toString();
     omServiceId = "om-service-test1";
@@ -123,9 +127,11 @@ public class TestStorageContainerManagerHA {
     Assertions.assertEquals(numOfSCMs, scms.size());
     int peerSize = cluster.getStorageContainerManager().getScmHAManager()
         .getRatisServer().getDivision().getGroup().getPeers().size();
+    StorageContainerManager leaderScm = null;
     for (StorageContainerManager scm : scms) {
       if (scm.checkLeader()) {
         count++;
+        leaderScm = scm;
       }
       Assertions.assertEquals(peerSize, numOfSCMs);
     }
@@ -139,7 +145,22 @@ public class TestStorageContainerManagerHA {
       }
     }
     Assertions.assertEquals(1, count);
+    
+    // verify timer based transaction buffer flush is working
+    SnapshotInfo latestSnapshot = leaderScm.getScmHAManager()
+        .asSCMHADBTransactionBuffer().getLatestSnapshot();
     testPutKey();
+    final StorageContainerManager leaderScmTmp = leaderScm;
+    GenericTestUtils.waitFor(() -> {
+      if (leaderScmTmp.getScmHAManager().asSCMHADBTransactionBuffer()
+          .getLatestSnapshot() != null) {
+        if (leaderScmTmp.getScmHAManager().asSCMHADBTransactionBuffer()
+            .getLatestSnapshot().getIndex() > latestSnapshot.getIndex()) {
+          return true;
+        }
+      }
+      return false;
+    }, 2000, 30000);
   }
 
   @Test
@@ -275,7 +296,7 @@ public class TestStorageContainerManagerHA {
     DefaultConfigManager.clearDefaultConfigs();
     scmStorageConfig.initialize();
     Assertions.assertThrows(ConfigurationException.class,
-            () -> StorageContainerManager.scmInit(conf, clusterId));
+        () -> StorageContainerManager.scmInit(conf, clusterId));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to