This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdc36fe HDDS-1649. On installSnapshot notification from OM leader,
download checkpoint and reload OM state (#948)
cdc36fe is described below
commit cdc36fe286708b5ff12675599da8c7650744f064
Author: Hanisha Koneru <[email protected]>
AuthorDate: Mon Jul 22 12:06:55 2019 -0700
HDDS-1649. On installSnapshot notification from OM leader, download
checkpoint and reload OM state (#948)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../common/src/main/resources/ozone-default.xml | 8 +
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 +
.../hadoop/ozone/om/exceptions/OMException.java | 3 +-
.../ozone/om/protocol/OzoneManagerHAProtocol.java | 3 +-
.../src/main/proto/OzoneManagerProtocol.proto | 2 +
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 6 +
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 49 ++-
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 189 +++++++++++
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 7 +-
.../hadoop/ozone/om/OMDBCheckpointServlet.java | 2 +-
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 9 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 359 ++++++++++++++++-----
.../ozone/om/ratis/OzoneManagerRatisServer.java | 15 +-
.../ozone/om/ratis/OzoneManagerStateMachine.java | 81 ++++-
.../om/snapshot/OzoneManagerSnapshotProvider.java | 2 +-
16 files changed, 637 insertions(+), 102 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index d28e477..67bd22d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -119,6 +119,7 @@ public final class OzoneConsts {
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
+ public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints";
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
public static final String SCM_DB_NAME = "scm.db";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 30cf386..b2f820b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1630,6 +1630,14 @@
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>
+ <property>
+ <name>ozone.om.ratis.log.purge.gap</name>
+ <value>1000000</value>
+ <tag>OZONE, OM, RATIS</tag>
+ <description>The minimum gap between log indices for Raft server to purge
+ its log segments after taking snapshot.
+ </description>
+ </property>
<property>
<name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 14b6783..35431fa 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -123,6 +123,9 @@ public final class OMConfigKeys {
"ozone.om.ratis.log.appender.queue.byte-limit";
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+ public static final String OZONE_OM_RATIS_LOG_PURGE_GAP =
+ "ozone.om.ratis.log.purge.gap";
+ public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
// OM Snapshot configurations
public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 66ce1cc..78bdb21 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -203,7 +203,8 @@ public class OMException extends IOException {
PREFIX_NOT_FOUND,
- S3_BUCKET_INVALID_LENGTH
+ S3_BUCKET_INVALID_LENGTH,
+ RATIS_ERROR // Error in Ratis server
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 1434dca..675c814 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -29,9 +29,10 @@ public interface OzoneManagerHAProtocol {
/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
+ * @param flush flush the OM DB to disk if true
* @return the snapshot index
* @throws IOException
*/
- long saveRatisSnapshot() throws IOException;
+ long saveRatisSnapshot(boolean flush) throws IOException;
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 7007d98..5dd2b55 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -278,6 +278,8 @@ enum Status {
PREFIX_NOT_FOUND=50;
S3_BUCKET_INVALID_LENGTH = 51; // s3 bucket invalid length.
+
+ RATIS_ERROR = 52;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 4927da1..1139a65 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -241,6 +241,7 @@ public interface MiniOzoneCluster {
protected String clusterId;
protected String omServiceId;
protected int numOfOMs;
+ protected int numOfActiveOMs;
protected Optional<Boolean> enableTrace = Optional.of(false);
protected Optional<Integer> hbInterval = Optional.empty();
@@ -440,6 +441,11 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setNumOfActiveOMs(int numActiveOMs) {
+ this.numOfActiveOMs = numActiveOMs;
+ return this;
+ }
+
public Builder setStreamBufferSizeUnit(StorageUnit unit) {
this.streamBufferSizeUnit = Optional.of(unit);
return this;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 4d00710..1d9a99e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -53,6 +53,10 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
private Map<String, OzoneManager> ozoneManagerMap;
private List<OzoneManager> ozoneManagers;
+ // Active OMs denote OMs which are up and running
+ private List<OzoneManager> activeOMs;
+ private List<OzoneManager> inactiveOMs;
+
private static final Random RANDOM = new Random();
private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds
@@ -67,11 +71,15 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
Map<String, OzoneManager> omMap,
+ List<OzoneManager> activeOMList,
+ List<OzoneManager> inactiveOMList,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
super(conf, scm, hddsDatanodes);
this.ozoneManagerMap = omMap;
this.ozoneManagers = new ArrayList<>(omMap.values());
+ this.activeOMs = activeOMList;
+ this.inactiveOMs = inactiveOMList;
}
/**
@@ -83,6 +91,10 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
return this.ozoneManagers.get(0);
}
+ public boolean isOMActive(String omNodeId) {
+ return activeOMs.contains(ozoneManagerMap.get(omNodeId));
+ }
+
public OzoneManager getOzoneManager(int index) {
return this.ozoneManagers.get(index);
}
@@ -91,6 +103,20 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
return this.ozoneManagerMap.get(omNodeId);
}
+ /**
+ * Start a previously inactive OM.
+ */
+ public void startInactiveOM(String omNodeID) throws IOException {
+ OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
+ if (!inactiveOMs.contains(ozoneManager)) {
+ throw new IOException("OM is already active.");
+ } else {
+ ozoneManager.start();
+ activeOMs.add(ozoneManager);
+ inactiveOMs.remove(ozoneManager);
+ }
+ }
+
@Override
public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) {
@@ -125,6 +151,8 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
public static class Builder extends MiniOzoneClusterImpl.Builder {
private final String nodeIdBaseStr = "omNode-";
+ private List<OzoneManager> activeOMs = new ArrayList<>();
+ private List<OzoneManager> inactiveOMs = new ArrayList<>();
/**
* Creates a new Builder.
@@ -137,6 +165,10 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
@Override
public MiniOzoneCluster build() throws IOException {
+ if (numOfActiveOMs > numOfOMs) {
+ throw new IllegalArgumentException("Number of active OMs cannot be " +
+ "more than the total number of OMs");
+ }
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
@@ -150,8 +182,8 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
- MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
- scm, hddsDatanodes);
+ MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(
+ conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
@@ -215,9 +247,16 @@ public final class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
om.setCertClient(certClient);
omMap.put(nodeId, om);
- om.start();
- LOG.info("Started OzoneManager RPC server at " +
- om.getOmRpcServerAddr());
+ if (i <= numOfActiveOMs) {
+ om.start();
+ activeOMs.add(om);
+ LOG.info("Started OzoneManager RPC server at " +
+ om.getOmRpcServerAddr());
+ } else {
+ inactiveOMs.add(om);
+ LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr()
+ + ". This OM is currently inactive (not running).");
+ }
}
// Set default OM address to point to the first OM. Clients would
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
new file mode 100644
index 0000000..6ac28c3
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;
+
+/**
+ * Tests the Ratis snaphsots feature in OM.
+ */
+public class TestOMRatisSnapshots {
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private ObjectStore objectStore;
+ private OzoneConfiguration conf;
+ private String clusterId;
+ private String scmId;
+ private int numOfOMs = 3;
+ private static final long SNAPSHOT_THRESHOLD = 50;
+ private static final int LOG_PURGE_GAP = 50;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public Timeout timeout = new Timeout(500_000);
+
+ /**
+ * Create a MiniOzoneCluster for testing. The cluster initially has one
+ * inactive OM. So at the start of the cluster, there will be 2 active and 1
+ * inactive OM.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+ conf.setLong(
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+ SNAPSHOT_THRESHOLD);
+ conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+ cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOMServiceId("om-service-test1")
+ .setNumOfOzoneManagers(numOfOMs)
+ .setNumOfActiveOMs(2)
+ .build();
+ cluster.waitForClusterToBeReady();
+ objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testInstallSnapshot() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+ .getCurrentProxyOMNodeId();
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Do some transactions so that the log index increases
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ retVolumeinfo.createBucket(bucketName);
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ long leaderOMappliedLogIndex =
+ leaderRatisServer.getStateMachineLastAppliedIndex();
+ leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+
+ List<String> keys = new ArrayList<>();
+ while (leaderOMappliedLogIndex < 2000) {
+ keys.add(createKey(ozoneBucket));
+ leaderOMappliedLogIndex =
+ leaderRatisServer.getStateMachineLastAppliedIndex();
+ }
+
+ // Get the latest db checkpoint from the leader OM.
+ long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
+ DBCheckpoint leaderDbCheckpoint =
+ leaderOM.getMetadataManager().getStore().getCheckpoint(false);
+
+ // Start the inactive OM
+ cluster.startInactiveOM(followerNodeId);
+
+ // The recently started OM should be lagging behind the leader OM.
+ long followerOMLastAppliedIndex =
+ followerOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+ Assert.assertTrue(
+ followerOMLastAppliedIndex < leaderOMSnaphsotIndex);
+
+ // Install leader OM's db checkpoint on the lagging OM.
+ followerOM.getOmRatisServer().getOmStateMachine().pause();
+ followerOM.getMetadataManager().getStore().close();
+ followerOM.replaceOMDBWithCheckpoint(
+ leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation());
+
+ // Reload the follower OM with new DB checkpoint from the leader OM.
+ followerOM.reloadOMState(leaderOMSnaphsotIndex);
+ followerOM.getOmRatisServer().getOmStateMachine().unpause(
+ leaderOMSnaphsotIndex);
+
+ // After the new checkpoint is loaded and state machine is unpaused, the
+ // follower OM lastAppliedIndex must match the snapshot index of the
+ // checkpoint.
+ followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+ .getStateMachineLastAppliedIndex();
+ Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+ Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+ followerOMMetaMngr.getVolumeKey(volumeName)));
+ Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+ followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+ for (String key : keys) {
+ Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get(
+ followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 05c53b3..92fc263 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -829,7 +829,11 @@ public class TestOzoneManagerHA {
}
- private void createKey(OzoneBucket ozoneBucket) throws IOException {
+ /**
+ * Create a key in the bucket.
+ * @return the key name.
+ */
+ static String createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
@@ -837,5 +841,6 @@ public class TestOzoneManagerHA {
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(), 0, data.length());
ozoneOutputStream.close();
+ return keyName;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index d54e121..b36a128 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -126,7 +126,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
// ratis snapshot first. This step also included flushing the OM DB.
// Hence, we can set flush to false.
flush = false;
- ratisSnapshotIndex = om.saveRatisSnapshot();
+ ratisSnapshotIndex = om.saveRatisSnapshot(true);
} else {
ratisSnapshotIndex = om.loadRatisSnapshotIndex();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 46fdabd..dbadf68 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -196,15 +196,18 @@ public class OMMetrics {
}
public void setNumVolumes(long val) {
- this.numVolumes.incr(val);
+ long oldVal = this.numVolumes.value();
+ this.numVolumes.incr(val - oldVal);
}
public void setNumBuckets(long val) {
- this.numBuckets.incr(val);
+ long oldVal = this.numBuckets.value();
+ this.numBuckets.incr(val - oldVal);
}
public void setNumKeys(long val) {
- this.numKeys.incr(val);
+ long oldVal = this.numKeys.value();
+ this.numKeys.incr(val- oldVal);
}
public long getNumVolumes() {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4312516..0267350 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import java.net.InetAddress;
+import java.nio.file.Path;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.KeyPair;
@@ -143,6 +144,10 @@ import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.utils.RetriableTask;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
import org.slf4j.Logger;
@@ -236,18 +241,20 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
- private final OMMetadataManager metadataManager;
- private final VolumeManager volumeManager;
- private final BucketManager bucketManager;
- private final KeyManager keyManager;
- private final PrefixManagerImpl prefixManager;
+
+ private OMMetadataManager metadataManager;
+ private VolumeManager volumeManager;
+ private BucketManager bucketManager;
+ private KeyManager keyManager;
+ private PrefixManagerImpl prefixManager;
+ private S3BucketManager s3BucketManager;
+
private final OMMetrics metrics;
private OzoneManagerHttpServer httpServer;
private final OMStorage omStorage;
private final ScmBlockLocationProtocol scmBlockClient;
private final StorageContainerLocationProtocol scmContainerClient;
private ObjectName omInfoBeanName;
- private final S3BucketManager s3BucketManager;
private Timer metricsTimer;
private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
private static final ObjectWriter WRITER =
@@ -258,7 +265,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private final Runnable shutdownHook;
private final File omMetaDir;
private final boolean isAclEnabled;
- private final IAccessAuthorizer accessAuthorizer;
+ private IAccessAuthorizer accessAuthorizer;
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
@@ -308,12 +315,37 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
throw new OMException("OM not initialized.",
ResultCodes.OM_NOT_INITIALIZED);
}
+
+ // Read configuration and set values.
+ ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
+ omMetaDir = OmUtils.getOmDbDir(configuration);
+ this.isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
+ OZONE_ACL_ENABLED_DEFAULT);
+ this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
+ OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+ this.preallocateBlocksMax = conf.getInt(
+ OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
+ OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
+ this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
+ HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
+ this.useRatisForReplication = conf.getBoolean(
+ DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+ // TODO: This is a temporary check. Once fully implemented, all OM state
+ // change should go through Ratis - be it standalone (for non-HA) or
+ // replicated (for HA).
+ isRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+
// Load HA related configurations
loadOMHAConfigs(configuration);
+ InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
+ omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
scmBlockClient = getScmBlockClient(configuration);
+ this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
// For testing purpose only, not hit scm from om as Hadoop UGI can't login
// two principals in the same JVM.
@@ -329,16 +361,32 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
- metadataManager = new OmMetadataManagerImpl(configuration);
+ secConfig = new SecurityConfig(configuration);
+ // Create the KMS Key Provider
+ try {
+ kmsProvider = createKeyProviderExt(configuration);
+ } catch (IOException ioe) {
+ kmsProvider = null;
+ LOG.error("Fail to create Key Provider");
+ }
+ if (secConfig.isSecurityEnabled()) {
+ omComponent = OM_DAEMON + "-" + omId;
+ if(omStorage.getOmCertSerialId() == null) {
+ throw new RuntimeException("OzoneManager started in secure mode but " +
+ "doesn't have SCM signed certificate.");
+ }
+ certClient = new OMCertificateClient(new SecurityConfig(conf),
+ omStorage.getOmCertSerialId());
+ }
+ if (secConfig.isBlockTokenEnabled()) {
+ blockTokenMgr = createBlockTokenSecretManager(configuration);
+ }
+
+ instantiateServices();
+
+ initializeRatisServer();
+ initializeRatisClient();
- // This is a temporary check. Once fully implemented, all OM state change
- // should go through Ratis - be it standalone (for non-HA) or replicated
- // (for HA).
- isRatisEnabled = configuration.getBoolean(
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
- startRatisServer();
- startRatisClient();
if (isRatisEnabled) {
// Create Ratis storage dir
String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
@@ -361,59 +409,44 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();
- InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
- omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
- secConfig = new SecurityConfig(configuration);
- volumeManager = new VolumeManagerImpl(metadataManager, configuration);
+ metrics = OMMetrics.create();
- // Create the KMS Key Provider
- try {
- kmsProvider = createKeyProviderExt(configuration);
- } catch (IOException ioe) {
- kmsProvider = null;
- LOG.error("Fail to create Key Provider");
- }
+ // Start Om Rpc Server.
+ omRpcServer = getRpcServer(conf);
+ omRpcAddress = updateRPCListenAddress(configuration,
+ OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+
+ shutdownHook = () -> {
+ saveOmMetrics();
+ };
+ ShutdownHookManager.get().addShutdownHook(shutdownHook,
+ SHUTDOWN_HOOK_PRIORITY);
+ }
+
+ /**
+ * Instantiate services which are dependent on the OM DB state.
+ * When OM state is reloaded, these services are re-initialized with the
+ * new OM state.
+ */
+ private void instantiateServices() throws IOException {
+ metadataManager = new OmMetadataManagerImpl(configuration);
+ volumeManager = new VolumeManagerImpl(metadataManager, configuration);
bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(),
isRatisEnabled);
- metrics = OMMetrics.create();
-
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
volumeManager, bucketManager);
if (secConfig.isSecurityEnabled()) {
- omComponent = OM_DAEMON + "-" + omId;
- if(omStorage.getOmCertSerialId() == null) {
- throw new RuntimeException("OzoneManager started in secure mode but " +
- "doesn't have SCM signed certificate.");
- }
- certClient = new OMCertificateClient(new SecurityConfig(conf),
- omStorage.getOmCertSerialId());
s3SecretManager = new S3SecretManagerImpl(configuration,
metadataManager);
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
- if (secConfig.isBlockTokenEnabled()) {
- blockTokenMgr = createBlockTokenSecretManager(configuration);
- }
-
- omRpcServer = getRpcServer(conf);
- omRpcAddress = updateRPCListenAddress(configuration,
- OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-
- this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
prefixManager = new PrefixManagerImpl(metadataManager);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
omStorage.getOmId());
- shutdownHook = () -> {
- saveOmMetrics();
- };
- ShutdownHookManager.get().addShutdownHook(shutdownHook,
- SHUTDOWN_HOOK_PRIORITY);
- isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
- OZONE_ACL_ENABLED_DEFAULT);
if (isAclEnabled) {
- accessAuthorizer = getACLAuthorizerInstance(conf);
+ accessAuthorizer = getACLAuthorizerInstance(configuration);
if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
OzoneNativeAuthorizer authorizer =
(OzoneNativeAuthorizer) accessAuthorizer;
@@ -425,17 +458,6 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
} else {
accessAuthorizer = null;
}
- ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
- omMetaDir = OmUtils.getOmDbDir(configuration);
- this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
- OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
- this.preallocateBlocksMax = conf.getInt(
- OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
- OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
- this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
- HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
- this.useRatisForReplication = conf.getBoolean(
- DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
}
/**
@@ -1235,6 +1257,14 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
DefaultMetricsSystem.initialize("OzoneManager");
+ // Start Ratis services
+ if (omRatisServer != null) {
+ omRatisServer.start();
+ }
+ if (omRatisClient != null) {
+ omRatisClient.connect();
+ }
+
metadataManager.start(configuration);
startSecretManagerIfNecessary();
@@ -1305,8 +1335,14 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
omRpcServer.start();
isOmRpcServerRunning = true;
- startRatisServer();
- startRatisClient();
+ initializeRatisServer();
+ if (omRatisServer != null) {
+ omRatisServer.start();
+ }
+ initializeRatisClient();
+ if (omRatisClient != null) {
+ omRatisClient.connect();
+ }
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
@@ -1353,15 +1389,13 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
/**
* Creates an instance of ratis server.
*/
- private void startRatisServer() throws IOException {
+ private void initializeRatisServer() throws IOException {
if (isRatisEnabled) {
if (omRatisServer == null) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
configuration, this, omNodeDetails, peerNodes);
}
- omRatisServer.start();
-
- LOG.info("OzoneManager Ratis server started at port {}",
+ LOG.info("OzoneManager Ratis server initialized at port {}",
omRatisServer.getServerPort());
} else {
omRatisServer = null;
@@ -1371,14 +1405,13 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
/**
* Creates an instance of ratis client.
*/
- private void startRatisClient() throws IOException {
+ private void initializeRatisClient() throws IOException {
if (isRatisEnabled) {
if (omRatisClient == null) {
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
configuration);
}
- omRatisClient.connect();
} else {
omRatisClient = null;
}
@@ -1398,11 +1431,13 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
@Override
- public long saveRatisSnapshot() throws IOException {
+ public long saveRatisSnapshot(boolean flush) throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
- // Flush the OM state to disk
- getMetadataManager().getStore().flush();
+ if (flush) {
+ // Flush the OM state to disk
+ metadataManager.getStore().flush();
+ }
PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
@@ -2697,7 +2732,6 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
}
}
-
@Override
public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException {
@@ -3069,6 +3103,179 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
}
+ /**
+ * Download and install latest checkpoint from leader OM.
+ * If the download checkpoints snapshot index is greater than this OM's
+ * last applied transaction index, then re-initialize the OM state via this
+ * checkpoint. Before re-initializing OM state, the OM Ratis server should
+ * be stopped so that no new transactions can be applied.
+ * @param leaderId peerNodeID of the leader OM
+ * @return If checkpoint is installed, return the corresponding termIndex.
+ * Otherwise, return null.
+ */
+ public TermIndex installSnapshot(String leaderId) {
+ if (omSnapshotProvider == null) {
+ LOG.error("OM Snapshot Provider is not configured as there are no peer "
+
+ "nodes.");
+ return null;
+ }
+
+ DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
+ Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
+
+ // Check if current ratis log index is smaller than the downloaded
+ // snapshot index. If yes, proceed by stopping the ratis server so that
+ // the OM state can be re-initialized. If no, then do not proceed with
+ // installSnapshot.
+ long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
+ long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
+ if (checkpointSnapshotIndex <= lastAppliedIndex) {
+ LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
+ "applied index: {} is greater than or equal to the checkpoint's " +
+ "snapshot index: {}. Deleting the downloaded checkpoint {}",
leaderId,
+ lastAppliedIndex, checkpointSnapshotIndex,
+ newDBlocation);
+ try {
+ FileUtils.deleteFully(newDBlocation);
+ } catch (IOException e) {
+ LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
+ "from OM leader {}.", newDBlocation,
+ leaderId, e);
+ }
+ return null;
+ }
+
+ // Pause the State Machine so that no new transactions can be applied.
+ // This action also clears the OM Double Buffer so that if there are any
+ // pending transactions in the buffer, they are discarded.
+ // TODO: The Ratis server should also be paused here. This is required
+ // because a leader election might happen while the snapshot
+ // installation is in progress and the new leader might start sending
+ // append log entries to the ratis server.
+ omRatisServer.getOmStateMachine().pause();
+
+ File dbBackup;
+ try {
+ dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, newDBlocation);
+ } catch (Exception e) {
+ LOG.error("OM DB checkpoint replacement with new downloaded checkpoint "
+
+ "failed.", e);
+ return null;
+ }
+
+ // Reload the OM DB store with the new checkpoint.
+ // Restart (unpause) the state machine and update its last applied index
+ // to the installed checkpoint's snapshot index.
+ try {
+ reloadOMState(checkpointSnapshotIndex);
+ omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex);
+ } catch (IOException e) {
+ LOG.error("Failed to reload OM state with new DB checkpoint.", e);
+ return null;
+ }
+
+ // Delete the backup DB
+ try {
+ FileUtils.deleteFully(dbBackup);
+ } catch (IOException e) {
+ LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
+ }
+
+ // TODO: We should only return the snpashotIndex to the leader.
+ // Should be fixed after RATIS-586
+ TermIndex newTermIndex = TermIndex.newTermIndex(0,
+ checkpointSnapshotIndex);
+
+ return newTermIndex;
+ }
+
+ /**
+ * Download the latest OM DB checkpoint from the leader OM.
+ * @param leaderId OMNodeID of the leader OM node.
+ * @return latest DB checkpoint from leader OM.
+ */
+ private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+ LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
+ "from the checkpoint.", leaderId);
+
+ try {
+ return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+ } catch (IOException e) {
+ LOG.error("Failed to download checkpoint from OM leader {}", leaderId,
e);
+ }
+ return null;
+ }
+
+ /**
+ * Replace the current OM DB with the new DB checkpoint.
+ * @param lastAppliedIndex the last applied index in the current OM DB.
+ * @param checkpointPath path to the new DB checkpoint
+ * @return location of the backup of the original DB
+ * @throws Exception
+ */
+ File replaceOMDBWithCheckpoint(long lastAppliedIndex, Path checkpointPath)
+ throws Exception {
+ // Stop the DB first
+ DBStore store = metadataManager.getStore();
+ store.close();
+
+ // Take a backup of the current DB
+ File db = store.getDbLocation();
+ String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
+ lastAppliedIndex + "_" + System.currentTimeMillis();
+ File dbBackup = new File(db.getParentFile(), dbBackupName);
+
+ try {
+ Files.move(db.toPath(), dbBackup.toPath());
+ } catch (IOException e) {
+ LOG.error("Failed to create a backup of the current DB. Aborting " +
+ "snapshot installation.");
+ throw e;
+ }
+
+ // Move the new DB checkpoint into the om metadata dir
+ try {
+ Files.move(checkpointPath, db.toPath());
+ } catch (IOException e) {
+ LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
+ "directory {}. Resetting to original DB.", checkpointPath,
+ db.toPath());
+ Files.move(dbBackup.toPath(), db.toPath());
+ throw e;
+ }
+ return dbBackup;
+ }
+
+ /**
+ * Re-instantiate MetadataManager with new DB checkpoint.
+ * All the classes which use/ store MetadataManager should also be updated
+ * with the new MetadataManager instance.
+ */
+ void reloadOMState(long newSnapshotIndex) throws IOException {
+
+ instantiateServices();
+
+ // Restart required services
+ metadataManager.start(configuration);
+ keyManager.start(configuration);
+
+ // Set metrics and start metrics back ground thread
+ metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
+ .getVolumeTable()));
+ metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
+ .getBucketTable()));
+
+ // Delete the omMetrics file if it exists and save the a new metrics file
+ // with new data
+ Files.deleteIfExists(getMetricsStorageFile().toPath());
+ saveOmMetrics();
+
+ // Update OM snapshot index with the new snapshot index (from the new OM
+ // DB state) and save the snapshot index to disk
+ this.snapshotIndex = newSnapshotIndex;
+ saveRatisSnapshot(false);
+ }
+
public static Logger getLogger() {
return LOG;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 49a84da..1e51273 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -308,10 +308,15 @@ public final class OzoneManagerRatisServer {
}
/**
- * Returns OzoneManager StateMachine.
+ * Initializes and returns OzoneManager StateMachine.
*/
private OzoneManagerStateMachine getStateMachine() {
- return new OzoneManagerStateMachine(this);
+ return new OzoneManagerStateMachine(this);
+ }
+
+ @VisibleForTesting
+ public OzoneManagerStateMachine getOmStateMachine() {
+ return omStateMachine;
}
public OzoneManager getOzoneManager() {
@@ -387,6 +392,12 @@ public final class OzoneManagerRatisServer {
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+ final int logPurgeGap = conf.getInt(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
+ OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// TODO: calculate the optimal max message size
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 31c467d..c51323e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
@@ -43,12 +44,15 @@ import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,8 +72,9 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
private OzoneManagerHARequestHandler handler;
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
- private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+ private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;
+ private final ExecutorService installSnapshotExecutor;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
@@ -82,19 +87,20 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+ this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
}
/**
* Initializes the State Machine with the given server, group and storage.
- * TODO: Load the latest snapshot from the file system.
*/
@Override
- public void initialize(
- RaftServer server, RaftGroupId id, RaftStorage raftStorage)
- throws IOException {
- super.initialize(server, id, raftStorage);
- this.raftGroupId = id;
- storage.init(raftStorage);
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ lifeCycle.startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ this.raftGroupId = id;
+ storage.init(raftStorage);
+ });
}
/**
@@ -185,6 +191,27 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
}
}
+ @Override
+ public void pause() {
+ lifeCycle.transition(LifeCycle.State.PAUSING);
+ lifeCycle.transition(LifeCycle.State.PAUSED);
+ ozoneManagerDoubleBuffer.stop();
+ }
+
+ /**
+ * Unpause the StateMachine, re-initialize the DoubleBuffer and update the
+ * lastAppliedIndex. This should be done after uploading new state to the
+ * StateMachine.
+ */
+ public void unpause(long newLastAppliedSnaphsotIndex) {
+ lifeCycle.startAndTransition(() -> {
+ this.ozoneManagerDoubleBuffer =
+ new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
+ this::updateLastAppliedIndex);
+ this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex);
+ });
+ }
+
/**
* Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
* is the log index corresponding to the last applied transaction on the OM
@@ -197,12 +224,45 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
public long takeSnapshot() throws IOException {
LOG.info("Saving Ratis snapshot on the OM.");
if (ozoneManager != null) {
- return ozoneManager.saveRatisSnapshot();
+ return ozoneManager.saveRatisSnapshot(true);
}
return 0;
}
/**
+ * Leader OM has purged entries from its log. To catch up, OM must download
+ * the latest checkpoint from the leader OM and install it.
+ * @param roleInfoProto the leader node information
+ * @param firstTermIndexInLog TermIndex of the first append entry available
+ * in the Leader's log.
+ * @return the last term index included in the installed snapshot.
+ */
+ @Override
+ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+ RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+
+ String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId())
+ .toString();
+
+ LOG.info("Received install snapshot notificaiton form OM leader: {} with "
+
+ "term index: {}", leaderNodeId, firstTermIndexInLog);
+
+ if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) {
+ // A non-leader Ratis server should not send this notification.
+ LOG.error("Received Install Snapshot notification from non-leader OM " +
+ "node: {}. Ignoring the notification.", leaderNodeId);
+ return completeExceptionally(new OMException("Received notification to "
+
+ "install snaphost from non-leader OM node",
+ OMException.ResultCodes.RATIS_ERROR));
+ }
+
+ CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
+ () -> ozoneManager.installSnapshot(leaderNodeId),
+ installSnapshotExecutor);
+ return future;
+ }
+
+ /**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
@@ -276,10 +336,9 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
this.raftGroupId = raftGroupId;
}
-
public void stop() {
ozoneManagerDoubleBuffer.stop();
HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5,
TimeUnit.SECONDS);
}
-
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index e1d4889..87446db 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -149,7 +149,7 @@ public class OzoneManagerSnapshotProvider {
* @param leaderOMNodeID leader OM Node ID.
* @return the DB checkpoint (including the ratis snapshot index)
*/
- protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
+ public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
throws IOException {
String snapshotFileName = OM_SNAPSHOT_DB + "_" +
System.currentTimeMillis();
File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]