Repository: hadoop Updated Branches: refs/heads/trunk 52cb766ad -> 82919a1e7
HDDS-638. Enable ratis snapshots for HDDS datanodes. Contributed by Mukul Kumar Singh. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/82919a1e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/82919a1e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/82919a1e Branch: refs/heads/trunk Commit: 82919a1e7af2ec22993f273e9ee64512a688c99b Parents: 52cb766 Author: Mukul Kumar Singh <msi...@apache.org> Authored: Mon Oct 22 19:59:35 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Mon Oct 22 19:59:35 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 + .../apache/hadoop/ozone/OzoneConfigKeys.java | 4 + .../common/src/main/resources/ozone-default.xml | 8 ++ .../server/ratis/ContainerStateMachine.java | 85 +++++++++++-- .../server/ratis/XceiverServerRatis.java | 14 ++- .../container/ozoneimpl/OzoneContainer.java | 5 + .../freon/TestFreonWithDatanodeRestart.java | 122 +++++++++++++++++++ 7 files changed, 233 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 5910eda..f95b748 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -104,6 +104,10 @@ public final class ScmConfigKeys { DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = TimeDuration.valueOf(1, TimeUnit.SECONDS); + public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY = + "dfs.ratis.snapshot.threshold"; + public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000; + public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = "dfs.ratis.server.failure.duration"; public static final TimeDuration http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 09d798f..c931dcf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -266,6 +266,10 @@ public final class OzoneConfigKeys { public static final TimeDuration DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT; + public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY = + ScmConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY; + public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = + ScmConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT; public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b2d47c9..237f8d8 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -119,6 +119,14 @@ </description> </property> <property> + <name>dfs.ratis.snapshot.threshold</name> + <value>10000</value> + <tag>OZONE, RATIS</tag> + <description>Number of transactions after which a ratis snapshot should be + taken. + </description> + </property> + <property> <name>dfs.container.ratis.num.write.chunk.threads</name> <value>60</value> <tag>OZONE, RATIS, PERFORMANCE</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 7b7be91..fa9fbf3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -25,6 +25,9 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; @@ -55,8 +58,10 @@ import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; @@ -115,6 +120,7 @@ public class ContainerStateMachine extends BaseStateMachine { createContainerFutureMap; private ExecutorService[] executors; private final int numExecutors; + private final Map<Long, Long> containerCommandCompletionMap; /** * CSM metrics. */ @@ -131,6 +137,7 @@ public class ContainerStateMachine extends BaseStateMachine { this.createContainerFutureMap = new ConcurrentHashMap<>(); this.numExecutors = numOfExecutors; executors = new ExecutorService[numExecutors]; + containerCommandCompletionMap = new ConcurrentHashMap<>(); for (int i = 0; i < numExecutors; i++) { executors[i] = Executors.newSingleThreadExecutor(); } @@ -151,10 +158,47 @@ public class ContainerStateMachine extends BaseStateMachine { throws IOException { super.initialize(server, id, raftStorage); storage.init(raftStorage); - // TODO handle snapshots - // TODO: Add a flag that tells you that initialize has been called. - // Check with Ratis if this feature is done in Ratis. + loadSnapshot(storage.getLatestSnapshot()); + } + + private long loadSnapshot(SingleFileSnapshotInfo snapshot) { + if (snapshot == null) { + TermIndex empty = TermIndex.newTermIndex(0, 0); + LOG.info("The snapshot info is null." + + "Setting the last applied index to:" + empty); + setLastAppliedTermIndex(empty); + return RaftServerConstants.INVALID_LOG_INDEX; + } + + final TermIndex last = + SimpleStateMachineStorage.getTermIndexFromSnapshotFile( + snapshot.getFile().getPath().toFile()); + LOG.info("Setting the last applied index to " + last); + setLastAppliedTermIndex(last); + return last.getIndex(); + } + + @Override + public long takeSnapshot() throws IOException { + TermIndex ti = getLastAppliedTermIndex(); + LOG.info("Taking snapshot at termIndex:" + ti); + if (ti != null) { + final File snapshotFile = + storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); + LOG.info("Taking a snapshot to file {}", snapshotFile); + try { + //TODO: For now, just create the file to save the term index, + //persist open container info to snapshot later. + snapshotFile.createNewFile(); + } catch(IOException ioe) { + LOG.warn("Failed to write snapshot file \"" + snapshotFile + + "\", last applied index=" + ti); + throw ioe; + } + return ti.getIndex(); + } + return -1; } @Override @@ -353,10 +397,9 @@ public class ContainerStateMachine extends BaseStateMachine { public CompletableFuture<Void> flushStateMachineData(long index) { List<CompletableFuture<Message>> futureList = writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) - .map(x -> x.getValue()).collect(Collectors.toList()); - CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + .map(Map.Entry::getValue).collect(Collectors.toList()); + return CompletableFuture.allOf( futureList.toArray(new CompletableFuture[futureList.size()])); - return combinedFuture; } /* * This api is used by the leader while appending logs to the follower @@ -394,11 +437,28 @@ public class ContainerStateMachine extends BaseStateMachine { } } + private void updateLastApplied() { + Long appliedTerm = null; + long appliedIndex = -1; + for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) { + final Long removed = containerCommandCompletionMap.remove(i); + if (removed == null) { + break; + } + appliedTerm = removed; + appliedIndex = i; + } + if (appliedTerm != null) { + updateLastAppliedTermIndex(appliedIndex, appliedTerm); + } + } + /* * ApplyTransaction calls in Ratis are sequential. */ @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + long index = trx.getLogEntry().getIndex(); try { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = @@ -418,7 +478,7 @@ public class ContainerStateMachine extends BaseStateMachine { blockDataProto.getBlockID()); return completeExceptionally(ioe); } - blockData.setBlockCommitSequenceId(trx.getLogEntry().getIndex()); + blockData.setBlockCommitSequenceId(index); final ContainerProtos.PutBlockRequestProto putBlockRequestProto = ContainerProtos.PutBlockRequestProto .newBuilder(requestProto.getPutBlock()) @@ -440,6 +500,14 @@ public class ContainerStateMachine extends BaseStateMachine { future.thenApply( r -> createContainerFutureMap.remove(containerID).complete(null)); } + + future.thenAccept(m -> { + final Long previous = + containerCommandCompletionMap + .put(index, trx.getLogEntry().getTerm()); + Preconditions.checkState(previous == null); + updateLastApplied(); + }); return future; } catch (IOException e) { metrics.incNumApplyTransactionsFails(); @@ -466,7 +534,8 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - for (int i = 0; i < numExecutors; i++){ + takeSnapshot(); + for (int i = 0; i < numExecutors; i++) { executors[i].shutdown(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 0cd407a..f0c2845 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -130,6 +130,10 @@ public final class XceiverServerRatis implements XceiverServerSpi { .build(); } + @VisibleForTesting + public ContainerStateMachine getStateMachine() { + return stateMachine; + } private RaftProperties newRaftProperties(Configuration conf) { final RaftProperties properties = new RaftProperties(); @@ -254,6 +258,15 @@ public final class XceiverServerRatis implements XceiverServerSpi { } else if (rpc == SupportedRpcType.NETTY) { NettyConfigKeys.Server.setPort(properties, port); } + + long snapshotThreshold = + conf.getLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, + OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT); + RaftServerConfigKeys.Snapshot. + setAutoTriggerEnabled(properties, true); + RaftServerConfigKeys.Snapshot. + setAutoTriggerThreshold(properties, snapshotThreshold); + return properties; } @@ -298,7 +311,6 @@ public final class XceiverServerRatis implements XceiverServerSpi { public void stop() { try { chunkExecutor.shutdown(); - stateMachine.close(); server.close(); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 503112d..9fac3cb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -230,6 +230,11 @@ public class OzoneContainer { return this.hddsDispatcher; } + @VisibleForTesting + public XceiverServerSpi getServer(ReplicationType replicationType) { + return servers.get(replicationType); + } + public VolumeSet getVolumeSet() { return volumeSet; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/82919a1e/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java new file mode 100644 index 0000000..a1c50b6 --- /dev/null +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests Freon with Datanode restarts. + */ +public class TestFreonWithDatanodeRestart { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + * <p> + * Ozone is made active by setting OZONE_ENABLED = true + * + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); + cluster = MiniOzoneCluster.newBuilder(conf) + .setHbProcessorInterval(1000) + .setHbInterval(1000) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testRestart() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setKeySize(20971520); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount()); + + ContainerStateMachine sm = getStateMachine(); + TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex(); + cluster.restartHddsDatanode(0); + sm = getStateMachine(); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage)sm.getStateMachineStorage(); + SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot(); + TermIndex termInSnapshot = snapshotInfo.getTermIndex(); + String expectedSnapFile = + storage.getSnapshotFile(termIndexBeforeRestart.getTerm(), + termIndexBeforeRestart.getIndex()).getAbsolutePath(); + Assert.assertEquals(snapshotInfo.getFile().getPath().toString(), + expectedSnapFile); + Assert.assertEquals(termInSnapshot, termIndexBeforeRestart); + + // After restart the term index might have progressed to apply pending + // transactions. + TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex(); + Assert.assertTrue(termIndexAfterRestart.getIndex() >= + termIndexBeforeRestart.getIndex()); + } + + private ContainerStateMachine getStateMachine() { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). + getContainer().getServer(HddsProtos.ReplicationType.RATIS); + return ((XceiverServerRatis)server).getStateMachine(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org