This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 741d8bd RATIS-556 Detect node failures and close the log to prevent
additional writes(Rajeshbabu)
741d8bd is described below
commit 741d8bd4fbb1c9642dd054e72cae0c7afb180170
Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla>
AuthorDate: Thu Sep 5 10:28:55 2019 +0530
RATIS-556 Detect node failures and close the log to prevent additional
writes(Rajeshbabu)
---
.../apache/ratis/logservice/common/Constants.java | 8 ++
.../apache/ratis/logservice/server/LogServer.java | 40 ++++++
.../ratis/logservice/server/LogStateMachine.java | 16 ++-
.../ratis/logservice/server/MetaStateMachine.java | 138 ++++++++++++++++++---
.../ratis/logservice/server/MetadataServer.java | 13 +-
.../ratis/logservice/util/LogServiceProtoUtil.java | 2 +-
.../logservice/util/MetaServiceProtoUtil.java | 11 +-
ratis-logservice/src/main/proto/LogService.proto | 1 +
ratis-logservice/src/main/proto/MetaService.proto | 9 +-
.../ratis/logservice/server/TestMetaServer.java | 40 +++++-
10 files changed, 244 insertions(+), 34 deletions(-)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
index be46177..7151341 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -58,4 +58,12 @@ public class Constants {
public static final String RATIS_RAFT_SEGMENT_SIZE_KEY =
"ratis.raft.segment.size";
public static final long DEFAULT_RATIS_RAFT_SEGMENT_SIZE = 32 * 1024
*1024;// 32MB
+ public static final String LOG_SERVICE_HEARTBEAT_INTERVAL_KEY =
+ "logservice.heartbeat.interval"; // in ms
+ public static final long DEFAULT_HEARTBEAT_INTERVAL = 3000;// 3 seconds
+
+ public static final String LOG_SERVICE_PEER_FAILURE_DETECTION_PERIOD_KEY =
+ "logservice.peer.failure.detection.period"; // in ms
+ public static final long DEFAULT_PEER_FAILURE_DETECTION_PERIOD = 60000;//
1 min.
+
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 50da769..6a3e926 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -40,6 +40,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -53,6 +54,8 @@ public class LogServer extends BaseServer {
private RaftServer raftServer = null;
private RaftClient metaClient = null;
+ private Daemon daemon = null;
+ private long heartbeatInterval = Constants.DEFAULT_HEARTBEAT_INTERVAL;
public LogServer(ServerOpts opts) {
super(opts);
LOG.debug("Log Server options: {}", opts);
@@ -84,6 +87,13 @@ public class LogServer extends BaseServer {
if (archiveLocation != null) {
properties.set(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY,
archiveLocation);
}
+ heartbeatInterval =
getConfig().getLong(Constants.LOG_SERVICE_HEARTBEAT_INTERVAL_KEY,
+ Constants.DEFAULT_HEARTBEAT_INTERVAL);
+ if(heartbeatInterval <= 0) {
+ LOG.warn("Heartbeat interval configuration is invalid." +
+ " Setting default value "+
Constants.DEFAULT_HEARTBEAT_INTERVAL);
+ heartbeatInterval = Constants.DEFAULT_HEARTBEAT_INTERVAL;
+ }
RaftServerConfigKeys.Log.setSegmentSizeMax(properties, segmentSizeBytes);
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
segmentSizeBytes);
@@ -136,6 +146,9 @@ public class LogServer extends BaseServer {
.setProperties(properties)
.build();
metaClient.send(() ->
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
+ daemon = new Daemon(new HeartbeatSender(new
RaftPeer(raftServer.getId())),
+ "heartbeat-Sender"+raftServer.getId());
+ daemon.start();
}
public static void main(String[] args) throws IOException {
@@ -164,6 +177,7 @@ public class LogServer extends BaseServer {
public void close() throws IOException {
raftServer.close();
+ daemon.interrupt();
}
public static class Builder extends BaseServer.Builder<LogServer> {
@@ -172,4 +186,30 @@ public class LogServer extends BaseServer {
return new LogServer(getOpts());
}
}
+
+ private class HeartbeatSender implements Runnable {
+
+ RaftPeer peer;
+ public HeartbeatSender(RaftPeer peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public void run() {
+
+ while (true) {
+ try {
+ metaClient.send(() -> MetaServiceProtoUtil.
+ toHeartbeatRequestProto(peer).toByteString());
+ Thread.sleep(heartbeatInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException e) {
+ LOG.warn("Heartbeat request failed with exception", e);
+ }
+ }
+
+ }
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 6b631a0..1d8cbcb 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -532,28 +532,34 @@ public class LogStateMachine extends BaseStateMachine {
// TODO need to handle exceptions while operating with files.
State targetState = State.valueOf(changeState.getState().name());
+ Throwable t = null;
//if forced skip checking states
if(!changeState.getForce()) {
switch (targetState) {
case OPEN:
if (state != null) {
- verifyState(State.OPEN, State.CLOSED);
+ t = verifyState(State.OPEN, State.CLOSED);
}
break;
case CLOSED:
- verifyState(State.OPEN);
+ t = verifyState(State.OPEN);
break;
case ARCHIVED:
- verifyState(State.ARCHIVING);
+ t = verifyState(State.ARCHIVING);
break;
case ARCHIVING:
- verifyState(State.CLOSED);
+ t = verifyState(State.CLOSED);
break;
case DELETED:
- verifyState(State.CLOSED);
+ t = verifyState(State.CLOSED);
break;
}
}
+ if(t != null) {
+ return CompletableFuture.completedFuture(Message
+ .valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().
+
setException(LogServiceProtoUtil.toLogException(t)).build().toByteString()));
+ }
this.state = targetState;
return CompletableFuture.completedFuture(Message
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 5e0785b..e50b39c 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -19,11 +19,7 @@
package org.apache.ratis.logservice.server;
import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -37,10 +33,13 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.logservice.api.LogInfo;
import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.common.Constants;
import org.apache.ratis.logservice.common.LogAlreadyExistException;
import org.apache.ratis.logservice.common.LogNotFoundException;
import org.apache.ratis.logservice.common.NoEnoughWorkersException;
import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.CreateLogRequestProto;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.DeleteLogRequestProto;
@@ -53,18 +52,16 @@ import
org.apache.ratis.logservice.util.MetaServiceProtoUtil;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl;
import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.*;
+
+
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,10 +86,15 @@ public class MetaStateMachine extends BaseStateMachine {
// keep a copy of raftServer to get group information.
private RaftServer raftServer;
+ private Map<RaftPeer, Set<LogName>> peerLogs = new ConcurrentHashMap<>();
+
+ private Map<RaftPeer, Long> heartbeatInfo = new ConcurrentHashMap<>();
private RaftGroup currentGroup = null;
+ private Daemon peerHealthChecker = null;
// MinHeap queue for load balancing groups across the peers
+ private long failureDetectionPeriod =
Constants.DEFAULT_PEER_FAILURE_DETECTION_PERIOD;
private PriorityBlockingQueue<PeerGroups> avail = new
PriorityBlockingQueue<PeerGroups>();
//Properties
@@ -104,9 +106,11 @@ public class MetaStateMachine extends BaseStateMachine {
private RaftGroupId logServerGroupId;
private RatisMetricRegistry metricRegistry;
- public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId
logServerGroupId) {
+ public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId
logServerGroupId,
+ long failureDetectionPeriod) {
this.metadataGroupId = metadataGroupId;
this.logServerGroupId = logServerGroupId;
+ this.failureDetectionPeriod = failureDetectionPeriod;
}
@Override
@@ -115,6 +119,8 @@ public class MetaStateMachine extends BaseStateMachine {
this.metricRegistry = LogServiceMetricsRegistry
.createMetricRegistryForLogServiceMetaData(server.getId().toString());
super.initialize(server, groupId, storage);
+ peerHealthChecker = new Daemon(new
PeerHealthChecker(),"peer-Health-Checker");
+ peerHealthChecker.start();
}
@Override
@@ -131,7 +137,19 @@ public class MetaStateMachine extends BaseStateMachine {
LogServiceRegisterLogRequestProto r = req.getRegisterRequest();
LogName logname =
LogServiceProtoUtil.toLogName(r.getLogname());
RaftGroup rg =
MetaServiceProtoUtil.toRaftGroup(r.getRaftGroup());
+ rg.getPeers().stream().forEach(raftPeer -> {
+ Set<LogName> logNames;
+ if(!peerLogs.containsKey(raftPeer)) {
+ logNames = new HashSet<>();
+ peerLogs.put(raftPeer, logNames);
+ } else {
+ logNames = peerLogs.get(raftPeer);
+ }
+ logNames.add(logname);
+
+ });
map.put(logname, rg);
+
LOG.info("Log {} registered at {} with group {} ", logname,
getId(), rg );
break;
case UNREGISTERREQUEST:
@@ -147,9 +165,14 @@ public class MetaStateMachine extends BaseStateMachine {
} else {
peers.add(peer);
avail.add(new PeerGroups(peer));
+ heartbeatInfo.put(peer, System.currentTimeMillis());
}
break;
-
+ case HEARTBEATREQUEST:
+ MetaServiceProtos.LogServiceHeartbeatRequestProto
heartbeatRequest = req.getHeartbeatRequest();
+ RaftPeer heartbeatPeer =
MetaServiceProtoUtil.toRaftPeer(heartbeatRequest.getPeer());
+ heartbeatInfo.put(heartbeatPeer, System.currentTimeMillis());
+ break;
default:
}
return super.applyTransactionSerial(trx);
@@ -328,15 +351,14 @@ public class MetaStateMachine extends BaseStateMachine {
.build();
try {
client.send(() ->
MetaServiceProtos.MetaSMRequestProto.newBuilder()
-
.setRegisterRequest(LogServiceRegisterLogRequestProto
- .newBuilder()
+
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
.setRaftGroup(MetaServiceProtoUtil
.toRaftGroupProto(raftGroup)))
.build().toByteString());
} catch (IOException e) {
LOG.error(
- "Exception while registring raft group with Metadata
Service during creation of log");
+ "Exception while registering raft group with Metadata
Service during creation of log");
e.printStackTrace();
}
return
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
@@ -382,7 +404,6 @@ public class MetaStateMachine extends BaseStateMachine {
public PeerGroups(RaftPeer peer) {
this.peer = peer;
-
}
public Set<RaftGroup> getGroups () {
@@ -398,4 +419,87 @@ public class MetaStateMachine extends BaseStateMachine {
return groups.size() - ((PeerGroups) o).groups.size();
}
}
+
+ private class PeerHealthChecker implements Runnable {
+ @Override
+ public void run() {
+ while(true) {
+ try {
+ Thread.sleep(1000);
+ long now = System.currentTimeMillis();
+ heartbeatInfo.keySet().stream().forEach(raftPeer -> {
+ Long heartbeatTimestamp = heartbeatInfo.get(raftPeer);
+ // Introduce configuration for period to detect the
failure.
+ if((now - heartbeatTimestamp) >
failureDetectionPeriod) {
+ // Close the logs serve by peer if any.
+ if (peerLogs.containsKey(raftPeer)) {
+ LOG.warn("Closing all logs hosted by peer {}
because last heartbeat" +
+ " ({}ms) exceeds the threshold
({}ms)", raftPeer, now - heartbeatTimestamp,
+ failureDetectionPeriod);
+ peers.remove(raftPeer);
+ Set<LogName> logNames = peerLogs.get(raftPeer);
+ Iterator<LogName> itr = logNames.iterator();
+ while(itr.hasNext()) {
+ LogName logName = itr.next();
+ RaftGroup group = map.get(logName);
+ RaftClient client =
RaftClient.newBuilder().
+
setRaftGroup(group).setProperties(properties).build();
+ try {
+ LOG.warn(String.format("Peer %s in the
group %s went down." +
+ " Hence closing the
log %s serve by the group.",
+ raftPeer.toString(),
group.toString(), logName.toString()));
+ RaftClientReply reply = client.send(
+ () -> LogServiceProtoUtil.
+
toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
+ .toByteString());
+ LogServiceProtos.ChangeStateReplyProto
message =
+
LogServiceProtos.ChangeStateReplyProto.parseFrom(reply.getMessage().getContent());
+ if(message.hasException()) {
+ throw new
IOException(message.getException().getErrorMsg());
+ }
+ itr.remove();
+ client.close();
+ } catch (IOException e) {
+ LOG.warn(String.format("Failed to
close log %s on peer %s failure.",
+ logName, raftPeer.toString()),
e);
+ }
+ }
+ if(logNames.isEmpty()) {
+ peerLogs.remove(raftPeer);
+ heartbeatInfo.remove(raftPeer);
+ } // else retry closing failed logs on next
period.
+ }
+ final List<PeerGroups> peerGroupsToRemove = new
ArrayList<>();
+ // remove peer groups from avail.
+ avail.stream().forEach(peerGroup -> {
+ if(peerGroup.getPeer().equals(raftPeer)) {
+ peerGroupsToRemove.add(peerGroup);
+ }
+ });
+ for(PeerGroups peerGroups: peerGroupsToRemove) {
+ avail.remove(peerGroups);
+ }
+ }
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Exception while closing logs and removing peer" +
+ " from raft groups with Metadata Service
on node failure", e);
+ }
+ }
+ }
+ }
+
+
+ // This method need to be used for testing only.
+ public boolean checkPeersAreSame() {
+ if(!peers.equals(peerLogs.keySet())) return false;
+ if(!peers.equals(heartbeatInfo.keySet())) return false;
+ Set<RaftPeer> availPeers = new HashSet<>();
+ avail.stream().forEach(peerGroups -> {
+ availPeers.add(peerGroups.getPeer());
+ });
+ if(!peers.equals(availPeers)) return false;
+ return true;
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
index 3bd75a7..9c914ea 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java
@@ -51,7 +51,7 @@ public class MetadataServer extends BaseServer {
private String id;
- private StateMachine metaStateMachine;
+ StateMachine metaStateMachine;
private LifeCycle lifeCycle;
@@ -82,11 +82,14 @@ public class MetadataServer extends BaseServer {
// Set properties common to all log service state machines
setRaftProperties(properties);
-
+ long failureDetectionPeriod = getConfig().
+
getLong(Constants.LOG_SERVICE_PEER_FAILURE_DETECTION_PERIOD_KEY,
+ Constants.DEFAULT_PEER_FAILURE_DETECTION_PERIOD);
Set<RaftPeer> peers = getPeersFromQuorum(opts.getMetaQuorum());
RaftGroupId raftMetaGroupId =
RaftGroupId.valueOf(opts.getMetaGroupId());
RaftGroup metaGroup = RaftGroup.valueOf(raftMetaGroupId, peers);
- metaStateMachine = new MetaStateMachine(raftMetaGroupId,
RaftGroupId.valueOf(opts.getLogServerGroupId()));
+ metaStateMachine = new MetaStateMachine(raftMetaGroupId,
RaftGroupId.valueOf(opts.getLogServerGroupId()),
+ failureDetectionPeriod);
// Make sure that we aren't setting any invalid/harmful properties
validateRaftProperties(properties);
@@ -159,4 +162,8 @@ public class MetadataServer extends BaseServer {
return new MetadataServer(getOpts());
}
}
+
+ public RaftServer getServer() {
+ return server;
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 8b8356e..89b53cb 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -61,7 +61,7 @@ public class LogServiceProtoUtil {
LogNameProto logNameProto =
LogNameProto.newBuilder().setName(logName.getName()).build();
ChangeStateLogRequestProto changeLog =
ChangeStateLogRequestProto.newBuilder().setLogName(logNameProto)
- .setState(LogStreamState.valueOf(state.name())).build();
+
.setState(LogStreamState.valueOf(state.name())).setForce(force).build();
return
LogServiceRequestProto.newBuilder().setChangeState(changeLog).build();
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
index 90227b1..8384d23 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
@@ -24,6 +24,7 @@ import org.apache.ratis.logservice.api.LogName;
import org.apache.ratis.logservice.proto.LogServiceProtos;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.*;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
@@ -81,6 +82,15 @@ public class MetaServiceProtoUtil {
.setPeer(MetaServiceProtoUtil.toRaftPeerProto(peer)).build()).build();
}
+ public static MetaSMRequestProto toHeartbeatRequestProto(RaftPeer peer) {
+ return MetaServiceProtos.MetaSMRequestProto
+ .newBuilder()
+ .setHeartbeatRequest(
+ MetaServiceProtos.LogServiceHeartbeatRequestProto
+ .newBuilder()
+
.setPeer(MetaServiceProtoUtil.toRaftPeerProto(peer)).build()).build();
+ }
+
public static MetaServiceRequestProto toCreateLogRequestProto(LogName
logName) {
LogServiceProtos.LogNameProto logNameProto =
LogServiceProtos.LogNameProto.newBuilder()
.setName(logName.getName())
@@ -193,7 +203,6 @@ public class MetaServiceProtoUtil {
};
}
-
public static DeleteLogReplyProto toDeleteLogReplyProto() {
return DeleteLogReplyProto.newBuilder().build();
}
diff --git a/ratis-logservice/src/main/proto/LogService.proto
b/ratis-logservice/src/main/proto/LogService.proto
index 5c06f95..eb27842 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -61,6 +61,7 @@ message GetStateRequestProto {
}
message ChangeStateReplyProto {
+ LogServiceException exception = 1;
}
message GetStateReplyProto {
diff --git a/ratis-logservice/src/main/proto/MetaService.proto
b/ratis-logservice/src/main/proto/MetaService.proto
index 16e0232..eef10e4 100644
--- a/ratis-logservice/src/main/proto/MetaService.proto
+++ b/ratis-logservice/src/main/proto/MetaService.proto
@@ -106,6 +106,13 @@ message LogServicePingRequestProto {
RaftPeerProto peer = 1;
}
+message LogServiceHeartbeatRequestProto {
+ RaftPeerProto peer = 1;
+}
+
+message LogServiceHeartbeatReplyProto {
+}
+
// Internal StateMachine change request
// includes: all operations with workers and raft groups.
message MetaSMRequestProto {
@@ -113,7 +120,7 @@ message MetaSMRequestProto {
LogServicePingRequestProto pingRequest = 1;
LogServiceRegisterLogRequestProto registerRequest = 2;
LogServiceUnregisterLogRequestProto unregisterRequest = 3;
-
+ LogServiceHeartbeatRequestProto heartbeatRequest = 4;
}
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 886f058..e30a8c4 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -27,6 +27,7 @@ import
org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
import org.apache.ratis.logservice.util.LogServiceCluster;
import org.apache.ratis.logservice.util.TestUtils;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.AfterClass;
@@ -42,17 +43,15 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
+import static org.junit.Assert.*;
+
public class TestMetaServer {
static LogServiceCluster cluster = null;
+ static List<LogServer> workers = null;
static AtomicInteger createCount = new AtomicInteger();
static AtomicInteger deleteCount = new AtomicInteger();
static AtomicInteger listCount = new AtomicInteger();
@@ -77,7 +76,7 @@ public class TestMetaServer {
public static void beforeClass() {
cluster = new LogServiceCluster(3);
cluster.createWorkers(3);
- List<LogServer> workers = cluster.getWorkers();
+ workers = cluster.getWorkers();
assert(workers.size() == 3);
}
@@ -101,6 +100,35 @@ public class TestMetaServer {
assertNotNull(logStream2);
}
+ /**
+ * Test closing log any of the peer in .
+ * @throws IOException
+ */
+ @Test
+ public void testCloseLogOnNodeFailure() throws Exception {
+ boolean peerClosed = false;
+ try {
+ for(int i = 0; i < 5; i++) {
+ LogStream logStream1 =
client.createLog(LogName.of("testCloseLogOnNodeFailure"+i));
+ assertNotNull(logStream1);
+ }
+
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).metaStateMachine).checkPeersAreSame());
+ workers.get(0).close();
+ peerClosed = true;
+ Thread.sleep(90000);
+
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).metaStateMachine).checkPeersAreSame());
+ for(int i = 0; i < 5; i++) {
+ LogStream logStream2 =
client.getLog(LogName.of("testCloseLogOnNodeFailure"+i));
+ assertNotNull(logStream2);
+ assertEquals(State.CLOSED, logStream2.getState());
+ }
+ } finally {
+ if(peerClosed) {
+ // recreate the worker closed in the test.
+ cluster.createWorkers(1);
+ }
+ }
+ }
@Test
public void testReadWritetoLog() throws IOException, InterruptedException {