This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 72bce3d792 HDDS-8508. SCMHATransactionBuffer flush based on time
(#4683)
72bce3d792 is described below
commit 72bce3d79209ea700b93005da533053172ce3e85
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu May 25 21:41:13 2023 +0530
HDDS-8508. SCMHATransactionBuffer flush based on time (#4683)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 18 ++++++
.../common/src/main/resources/ozone-default.xml | 12 ++++
.../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 3 +
.../hdds/scm/ha/SCMHADBTransactionBuffer.java | 5 ++
.../hdds/scm/ha/SCMHADBTransactionBufferImpl.java | 37 ++++++++++--
.../hdds/scm/ha/SCMHADBTransactionBufferStub.java | 11 ++++
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 28 +++++++++
.../hadoop/hdds/scm/ha/SCMHAManagerStub.java | 5 ++
.../scm/ha/SCMHATransactionBufferMonitorTask.java | 68 ++++++++++++++++++++++
.../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 2 +
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 28 +++++++--
.../hdds/scm/ha/TestReplicationAnnotation.java | 5 ++
.../hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java | 3 +
.../ozone/scm/TestStorageContainerManagerHA.java | 23 +++++++-
14 files changed, 236 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1ed7e0a806..f59cb1b22b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -560,11 +560,24 @@ public final class ScmConfigKeys {
"ozone.scm.ha.ratis.log.purge.gap";
public static final int OZONE_SCM_HA_RAFT_LOG_PURGE_GAP_DEFAULT = 1000000;
+ /**
+ * the config will transfer value to ratis config
+ * raft.server.snapshot.auto.trigger.threshold.
+ */
public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD =
"ozone.scm.ha.ratis.snapshot.threshold";
public static final long OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD_DEFAULT =
1000L;
+ /**
+ * the config will transfer value to ratis config
+ * raft.server.snapshot.creation.gap, used by ratis to take snapshot
+ * when manual trigger using api.
+ */
+ public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_GAP
+ = "ozone.scm.ha.ratis.server.snapshot.creation.gap";
+ public static final long OZONE_SCM_HA_RATIS_SNAPSHOT_GAP_DEFAULT =
+ 1024L;
public static final String OZONE_SCM_HA_RATIS_SNAPSHOT_DIR =
"ozone.scm.ha.ratis.snapshot.dir";
@@ -585,6 +598,11 @@ public final class ScmConfigKeys {
public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_SCMAUDIT =
"ozone.audit.log.debug.cmd.list.scmaudit";
+
+ public static final String OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL =
+ "ozone.scm.ha.dbtransactionbuffer.flush.interval";
+ public static final long
+ OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT = 600 * 1000L;
/**
* Never constructed.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 22671cfaa9..fa0dee1df6 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3435,6 +3435,18 @@
<description>Wait duration before which close container
is send to DN.</description>
</property>
+ <property>
+ <name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
+ <value>1024</value>
+ <tag>SCM, OZONE</tag>
+ <description>Raft snapshot gap index after which snapshot can be
taken.</description>
+ </property>
+ <property>
+ <name>ozone.scm.ha.dbtransactionbuffer.flush.interval</name>
+ <value>600s</value>
+ <tag>SCM, OZONE</tag>
+ <description>Wait duration for flush of buffered transaction.</description>
+ </property>
<property>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index 92cf7a9423..67a9b6ad9a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -231,6 +231,9 @@ public final class RatisUtil {
Snapshot.setAutoTriggerThreshold(properties,
ozoneConf.getLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD,
ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD_DEFAULT));
+ Snapshot.setCreationGap(properties,
+ ozoneConf.getLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP,
+ ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP_DEFAULT));
}
public static void checkRatisException(IOException e, String port,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
index f3f9f97e8c..b6d57f3384 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* DB transaction that buffers SCM DB transactions. Call the flush method
@@ -38,7 +39,11 @@ public interface SCMHADBTransactionBuffer
void setLatestSnapshot(SnapshotInfo latestSnapshot);
+ AtomicReference<SnapshotInfo> getLatestSnapshotRef();
+
void flush() throws IOException;
+
+ boolean shouldFlush(long snapshotWaitTime);
void init() throws IOException;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index eb5dcd7c27..19990dc23c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
@@ -43,7 +45,10 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
private SCMMetadataStore metadataStore;
private BatchOperation currentBatchOperation;
private TransactionInfo latestTrxInfo;
- private SnapshotInfo latestSnapshot;
+ private final AtomicReference<SnapshotInfo> latestSnapshot
+ = new AtomicReference<>();
+ private final AtomicLong txFlushPending = new AtomicLong(0);
+ private long lastSnapshotTimeMs = 0;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public SCMHADBTransactionBufferImpl(StorageContainerManager scm)
@@ -61,6 +66,7 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
Table<KEY, VALUE> table, KEY key, VALUE value) throws IOException {
rwLock.readLock().lock();
try {
+ txFlushPending.getAndIncrement();
table.putWithBatch(getCurrentBatchOperation(), key, value);
} finally {
rwLock.readLock().unlock();
@@ -72,6 +78,7 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
throws IOException {
rwLock.readLock().lock();
try {
+ txFlushPending.getAndIncrement();
table.deleteWithBatch(getCurrentBatchOperation(), key);
} finally {
rwLock.readLock().unlock();
@@ -95,12 +102,17 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
@Override
public SnapshotInfo getLatestSnapshot() {
- return latestSnapshot;
+ return latestSnapshot.get();
}
@Override
public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
- this.latestSnapshot = latestSnapshot;
+ this.latestSnapshot.set(latestSnapshot);
+ }
+
+ @Override
+ public AtomicReference<SnapshotInfo> getLatestSnapshotRef() {
+ return latestSnapshot;
}
@Override
@@ -115,7 +127,7 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
metadataStore.getStore().commitBatchOperation(currentBatchOperation);
currentBatchOperation.close();
- this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ this.latestSnapshot.set(latestTrxInfo.toSnapshotInfo());
// reset batch operation
currentBatchOperation = metadataStore.getStore().initBatchOperation();
@@ -125,6 +137,8 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
deletedBlockLog instanceof DeletedBlockLogImpl);
((DeletedBlockLogImpl) deletedBlockLog).onFlush();
} finally {
+ txFlushPending.set(0);
+ lastSnapshotTimeMs = scm.getSystemClock().millis();
rwLock.writeLock().unlock();
}
}
@@ -151,11 +165,24 @@ public class SCMHADBTransactionBufferImpl implements
SCMHADBTransactionBuffer {
.setCurrentTerm(0)
.build();
}
- latestSnapshot = latestTrxInfo.toSnapshotInfo();
+ latestSnapshot.set(latestTrxInfo.toSnapshotInfo());
} finally {
+ txFlushPending.set(0);
+ lastSnapshotTimeMs = scm.getSystemClock().millis();
rwLock.writeLock().unlock();
}
}
+
+ @Override
+ public boolean shouldFlush(long snapshotWaitTime) {
+ rwLock.readLock().lock();
+ try {
+ long timeDiff = scm.getSystemClock().millis() - lastSnapshotTimeMs;
+ return txFlushPending.get() > 0 && timeDiff > snapshotWaitTime;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
@Override
public String toString() {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
index 457c8c23f3..66d51c08c5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
// TODO: Move this class to test package after fixing Recon
@@ -95,6 +96,11 @@ public class SCMHADBTransactionBufferStub implements
SCMHADBTransactionBuffer {
}
+ @Override
+ public AtomicReference<SnapshotInfo> getLatestSnapshotRef() {
+ return null;
+ }
+
@Override
public void flush() throws IOException {
rwLock.writeLock().lock();
@@ -111,6 +117,11 @@ public class SCMHADBTransactionBufferStub implements
SCMHADBTransactionBuffer {
}
}
+ @Override
+ public boolean shouldFlush(long snapshotWaitTime) {
+ return true;
+ }
+
@Override
public void init() throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 6af0fb9919..57c47b4b9d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.ha;
import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
@@ -45,6 +46,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT;
+
/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
* node Ratis ring. The Ratis ring will have one Leader node and 2N follower
@@ -67,6 +71,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
// this should ideally be started only in a ratis leader
private final InterSCMGrpcProtocolService grpcServer;
+ private BackgroundSCMService trxBufferMonitorService = null;
/**
* Creates SCMHAManager instance.
@@ -125,6 +130,26 @@ public class SCMHAManagerImpl implements SCMHAManager {
ratisServer.getDivision().getGroup().getPeers());
}
grpcServer.start();
+ createStartTransactionBufferMonitor();
+ }
+
+ private void createStartTransactionBufferMonitor() {
+ long interval = conf.getTimeDuration(
+ OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+ OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ SCMHATransactionBufferMonitorTask monitorTask
+ = new SCMHATransactionBufferMonitorTask(
+ (SCMHADBTransactionBuffer) transactionBuffer, ratisServer, interval);
+ trxBufferMonitorService =
+ new BackgroundSCMService.Builder().setClock(scm.getSystemClock())
+ .setScmContext(scm.getScmContext())
+ .setServiceName("SCMHATransactionMonitor")
+ .setIntervalInMillis(interval)
+ .setWaitTimeInMillis(interval)
+ .setPeriodicalTask(monitorTask).build();
+ scm.getSCMServiceManager().register(trxBufferMonitorService);
+ trxBufferMonitorService.start();
}
public SCMRatisServer getRatisServer() {
@@ -342,6 +367,9 @@ public class SCMHAManagerImpl implements SCMHAManager {
grpcServer.stop();
close();
}
+ if (trxBufferMonitorService != null) {
+ trxBufferMonitorService.stop();
+ }
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 6382df3483..7c926ce884 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -203,6 +203,11 @@ public final class SCMHAManagerStub implements
SCMHAManager {
return SCMRatisResponse.decode(reply);
}
+ @Override
+ public boolean triggerSnapshot() throws IOException {
+ throw new IOException("submitSnapshotRequest is called.");
+ }
+
private Message process(final SCMRatisRequest request) throws Exception {
try {
final Object handler = handlers.get(request.getType());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
new file mode 100644
index 0000000000..1eb7690290
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHATransactionBufferMonitorTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background service running in SCM to check and flush the HA Transaction
+ * buffer.
+ */
+public class SCMHATransactionBufferMonitorTask implements Runnable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMHATransactionBufferMonitorTask.class);
+ private final SCMRatisServer server;
+ private final SCMHADBTransactionBuffer transactionBuffer;
+ private final long flushInterval;
+
+ /**
+ * SCMService related variables.
+ */
+ public SCMHATransactionBufferMonitorTask(
+ SCMHADBTransactionBuffer transactionBuffer,
+ SCMRatisServer server, long flushInterval) {
+ this.flushInterval = flushInterval;
+ this.transactionBuffer = transactionBuffer;
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ if (transactionBuffer.shouldFlush(flushInterval)) {
+ LOG.debug("Running TransactionFlushTask");
+ // set latest snapshot to null for force snapshot
+ // the value will be reset again when snapshot is taken
+ final SnapshotInfo lastSnapshot = transactionBuffer
+ .getLatestSnapshotRef().getAndSet(null);
+ try {
+ server.triggerSnapshot();
+ } catch (IOException e) {
+ LOG.error("Snapshot request is failed", e);
+ } finally {
+ // under failure case, if unable to take snapshot, its value
+ // is reset to previous known value
+ transactionBuffer.getLatestSnapshotRef().compareAndSet(
+ null, lastSnapshot);
+ }
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index d5de48107e..a786bd2944 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -42,6 +42,8 @@ public interface SCMRatisServer {
throws IOException, ExecutionException, InterruptedException,
TimeoutException;
+ boolean triggerSnapshot() throws IOException;
+
void stop() throws IOException;
boolean isStopped();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 8360a8de37..fa4e12e0b4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -52,6 +52,7 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServer;
@@ -77,6 +78,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
private final RaftServer.Division division;
private final GrpcTlsConfig grpcTlsConfig;
private boolean isStopped;
+ private final long requestTimeout;
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
@@ -84,6 +86,14 @@ public class SCMRatisServerImpl implements SCMRatisServer {
final StorageContainerManager scm, final SCMHADBTransactionBuffer buffer)
throws IOException {
this.scm = scm;
+
+ requestTimeout = ozoneConf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ Preconditions.checkArgument(requestTimeout > 1000L,
+ "Ratis request timeout cannot be less than 1000ms.");
+
final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
LOG.info("starting Raft server for scm:{}", scm.getScmId());
// During SCM startup, the bootstrapped node will be started just with
@@ -214,12 +224,6 @@ public class SCMRatisServerImpl implements SCMRatisServer {
.setType(RaftClientRequest.writeRequestType())
.build();
// any request submitted to
- final long requestTimeout = ozoneConf.getTimeDuration(
- ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_HA_RATIS_REQUEST_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- Preconditions.checkArgument(requestTimeout > 1000L,
- "Ratis request timeout cannot be less than 1000ms.");
final RaftClientReply raftClientReply =
server.submitClientRequestAsync(raftClientRequest)
.get(requestTimeout, TimeUnit.MILLISECONDS);
@@ -229,6 +233,18 @@ public class SCMRatisServerImpl implements SCMRatisServer {
return SCMRatisResponse.decode(raftClientReply);
}
+ @Override
+ public boolean triggerSnapshot() throws IOException {
+ final SnapshotManagementRequest req = SnapshotManagementRequest.newCreate(
+ clientId, getDivision().getId(), getDivision().getGroup().getGroupId(),
+ nextCallId(), requestTimeout);
+ final RaftClientReply raftClientReply = server.snapshotManagement(req);
+ if (!raftClientReply.isSuccess()) {
+ LOG.warn("Snapshot request failed", raftClientReply.getException());
+ }
+ return raftClientReply.isSuccess();
+ }
+
private long nextCallId() {
return callId.getAndIncrement() & Long.MAX_VALUE;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index a1e7a30f01..24a09bdac2 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -63,6 +63,11 @@ public class TestReplicationAnnotation {
throw new IOException("submitRequest is called.");
}
+ @Override
+ public boolean triggerSnapshot() throws IOException {
+ throw new IOException("submitSnapshotRequest is called.");
+ }
+
@Override
public void stop() throws IOException {
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
index 11209217cc..f1fd5590c0 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.hdds.scm.ha;
+import java.time.Clock;
+import java.time.ZoneOffset;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -253,6 +255,7 @@ class TestSCMHAManagerImpl {
when(dbStore.initBatchOperation()).thenReturn(batchOperation);
when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" +
conf.get(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY));
+ when(scm.getSystemClock()).thenReturn(Clock.system(ZoneOffset.UTC));
final SCMHAManager manager = new SCMHAManagerImpl(conf, scm);
when(scm.getScmHAManager()).thenReturn(manager);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index 942bb6a575..ff2d9dae69 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -90,6 +91,9 @@ public class TestStorageContainerManagerHA {
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.set(ScmConfigKeys.OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL,
+ "5s");
+ conf.set(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_GAP, "1");
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test1";
@@ -123,9 +127,11 @@ public class TestStorageContainerManagerHA {
Assertions.assertEquals(numOfSCMs, scms.size());
int peerSize = cluster.getStorageContainerManager().getScmHAManager()
.getRatisServer().getDivision().getGroup().getPeers().size();
+ StorageContainerManager leaderScm = null;
for (StorageContainerManager scm : scms) {
if (scm.checkLeader()) {
count++;
+ leaderScm = scm;
}
Assertions.assertEquals(peerSize, numOfSCMs);
}
@@ -139,7 +145,22 @@ public class TestStorageContainerManagerHA {
}
}
Assertions.assertEquals(1, count);
+
+ // verify timer based transaction buffer flush is working
+ SnapshotInfo latestSnapshot = leaderScm.getScmHAManager()
+ .asSCMHADBTransactionBuffer().getLatestSnapshot();
testPutKey();
+ final StorageContainerManager leaderScmTmp = leaderScm;
+ GenericTestUtils.waitFor(() -> {
+ if (leaderScmTmp.getScmHAManager().asSCMHADBTransactionBuffer()
+ .getLatestSnapshot() != null) {
+ if (leaderScmTmp.getScmHAManager().asSCMHADBTransactionBuffer()
+ .getLatestSnapshot().getIndex() > latestSnapshot.getIndex()) {
+ return true;
+ }
+ }
+ return false;
+ }, 2000, 30000);
}
@Test
@@ -275,7 +296,7 @@ public class TestStorageContainerManagerHA {
DefaultConfigManager.clearDefaultConfigs();
scmStorageConfig.initialize();
Assertions.assertThrows(ConfigurationException.class,
- () -> StorageContainerManager.scmInit(conf, clusterId));
+ () -> StorageContainerManager.scmInit(conf, clusterId));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]