This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b7a8ff334b IGNITE-17465 Backport JRaft improvements up to 1.3.9 -
Fixes #1049.
b7a8ff334b is described below
commit b7a8ff334b46a781704daf66512040509434052f
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Feb 20 15:28:40 2023 +0300
IGNITE-17465 Backport JRaft improvements up to 1.3.9 - Fixes #1049.
Signed-off-by: Alexey Scherbakov <[email protected]>
---
.../ignite/raft/jraft/core/ItCliServiceTest.java | 18 +++
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 5 +
.../org/apache/ignite/raft/jraft/CliService.java | 11 ++
.../java/org/apache/ignite/raft/jraft/Node.java | 15 +-
.../ignite/raft/jraft/closure/JoinableClosure.java | 7 +-
.../ignite/raft/jraft/core/CliServiceImpl.java | 109 ++++++-------
.../ignite/raft/jraft/core/FSMCallerImpl.java | 2 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 52 +++---
.../raft/jraft/core/ReadOnlyServiceImpl.java | 25 ++-
.../apache/ignite/raft/jraft/core/Replicator.java | 177 +++++++++++++++------
.../apache/ignite/raft/jraft/entity/Checksum.java | 18 +++
.../apache/ignite/raft/jraft/entity/LogEntry.java | 22 +--
.../org/apache/ignite/raft/jraft/entity/Task.java | 14 +-
.../ignite/raft/jraft/rpc/InvokeContext.java | 4 +
.../rpc/impl/core/DefaultRaftClientService.java | 26 ++-
.../raft/jraft/storage/impl/LogManagerImpl.java | 6 +-
.../snapshot/local/LocalSnapshotStorage.java | 2 +-
.../jraft/storage/snapshot/remote/CopySession.java | 4 +
.../apache/ignite/raft/jraft/util/SegmentList.java | 4 +-
.../raft/jraft/util/StorageOptionsFactory.java | 11 +-
.../org/apache/ignite/raft/jraft/util/Utils.java | 38 +++++
.../ignite/raft/jraft/core/ReplicatorTest.java | 4 +-
.../ignite/raft/jraft/entity/LogEntryTest.java | 5 +-
.../entity/codec/BaseLogEntryCodecFactoryTest.java | 23 ++-
.../ItRaftCommandLeftInLogUntilRestartTest.java | 4 +-
25 files changed, 410 insertions(+), 196 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index 931b155021..82589b73ae 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -271,6 +272,23 @@ public class ItCliServiceTest {
sleep(1000);
assertEquals(Collections.singletonList(learner3.getPeerId()),
cliService.getLearners(groupId, conf));
assertTrue(cliService.getAliveLearners(groupId, conf).isEmpty());
+
+ TestPeer learner4 = new TestPeer(testInfo, TestUtils.INIT_PORT +
LEARNER_PORT_STEP + 4);
+ assertTrue(cluster.startLearner(learner4));
+
+ cliService.addLearners(groupId, conf,
Collections.singletonList(learner4.getPeerId()));
+ sleep(1000);
+ assertEquals(1, cliService.getAliveLearners(groupId, conf).size());
+ assertTrue(cliService.learner2Follower(groupId, conf,
learner4.getPeerId()).isOk());
+
+ sleep(1000);
+ List<PeerId> currentLearners = cliService.getAliveLearners(groupId,
conf);
+ assertFalse(currentLearners.contains(learner4.getPeerId()));
+
+ List<PeerId> currentFollowers = cliService.getPeers(groupId, conf);
+ assertTrue(currentFollowers.contains(learner4.getPeerId()));
+
+ cluster.stop(learner4.getPeerId());
}
@Test
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index b65a3f3652..0066219717 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -549,6 +549,11 @@ public class ItNodeTest {
LOG.info("Replicator has been created {} {}", peer, val);
}
+ @Override
+ public void stateChanged(final PeerId peer, final ReplicatorState
newState) {
+ LOG.info("Replicator {} state is changed into {}.", peer,
newState);
+ }
+
/** {@inheritDoc} */
@Override
public void onError(PeerId peer, Status status) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
index 2974e3944d..1897b0e356 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
@@ -87,6 +87,17 @@ public interface CliService extends Lifecycle<CliOptions> {
*/
Status removeLearners(final String groupId, final Configuration conf,
final List<PeerId> learners);
+ /**
+ * Converts the specified learner to follower of |conf|.
+ * return OK status when success.
+ *
+ * @param groupId the raft group id
+ * @param conf current configuration
+ * @param learner learner peer
+ * @return operation status
+ */
+ Status learner2Follower(final String groupId, final Configuration conf,
final PeerId learner);
+
/**
* Update learners set in the replicating group which consists of |conf|.
return OK status when success.
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
index 0592d6d3dc..078729af46 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
@@ -21,6 +21,7 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
@@ -77,13 +78,6 @@ public interface Node extends Lifecycle<NodeOptions>,
Describer {
*/
boolean isLeader(final boolean blocking);
- /**
- * Shutdown local replica node.
- *
- * @param done callback
- */
- void shutdown(final Closure done);
-
/**
* Block the thread until the node is successfully stopped.
*
@@ -310,6 +304,13 @@ public interface Node extends Lifecycle<NodeOptions>,
Describer {
*/
int getNodeTargetPriority();
+ /**
+ * Get the node's state.
+ *
+ * @return node's state.
+ */
+ State getNodeState();
+
/**
* Get the node's current term.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
index 1e763059e7..d81ae74b5d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/closure/JoinableClosure.java
@@ -37,8 +37,11 @@ public class JoinableClosure implements Closure {
@Override
public void run(final Status status) {
- this.closure.run(status);
- latch.countDown();
+ try {
+ this.closure.run(status);
+ } finally {
+ latch.countDown();
+ }
}
public void join() throws InterruptedException {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
index 02c656b40e..43d5e58c0a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
@@ -20,6 +20,7 @@ import static java.util.stream.Collectors.toList;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -93,6 +94,36 @@ public class CliServiceImpl implements CliService {
this.cliClientService = null;
}
+ private void recordConfigurationChange(final String groupId, final
Collection<String> oldPeersList,
+ final Collection<String>
newPeersList) {
+ final Configuration oldConf = new Configuration();
+ for (final String peerIdStr : oldPeersList) {
+ final PeerId oldPeer = new PeerId();
+ oldPeer.parse(peerIdStr);
+ oldConf.addPeer(oldPeer);
+ }
+ final Configuration newConf = new Configuration();
+ for (final String peerIdStr : newPeersList) {
+ final PeerId newPeer = new PeerId();
+ newPeer.parse(peerIdStr);
+ newConf.addPeer(newPeer);
+ }
+ LOG.info("Configuration of replication group {} changed from {} to
{}.", groupId, oldConf, newConf);
+ }
+
+ private Status checkLeaderAndConnect(final String groupId, final
Configuration conf, final PeerId leaderId) {
+ final Status st = getLeader(groupId, conf, leaderId);
+ if (!st.isOk()) {
+ return st;
+ }
+
+ if (!this.cliClientService.connect(leaderId)) {
+ return new Status(-1, "Fail to init channel to leader %s",
leaderId);
+ }
+
+ return Status.OK();
+ }
+
@Override
public Status addPeer(final String groupId, final Configuration conf,
final PeerId peer) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
@@ -100,15 +131,11 @@ public class CliServiceImpl implements CliService {
Requires.requireNonNull(peer, "Null peer");
final PeerId leaderId = new PeerId();
- final Status st = getLeader(groupId, conf, leaderId);
+ final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}
- if (!this.cliClientService.connect(leaderId)) {
- return new Status(-1, "Fail to init channel to leader %s",
leaderId);
- }
-
AddPeerRequest req = cliOptions.getRaftMessagesFactory()
.addPeerRequest()
.groupId(groupId)
@@ -120,20 +147,7 @@ public class CliServiceImpl implements CliService {
final Message result = this.cliClientService.addPeer(leaderId,
req, null).get();
if (result instanceof AddPeerResponse) {
final AddPeerResponse resp = (AddPeerResponse) result;
- final Configuration oldConf = new Configuration();
- for (final String peerIdStr : resp.oldPeersList()) {
- final PeerId oldPeer = new PeerId();
- oldPeer.parse(peerIdStr);
- oldConf.addPeer(oldPeer);
- }
- final Configuration newConf = new Configuration();
- for (final String peerIdStr : resp.newPeersList()) {
- final PeerId newPeer = new PeerId();
- newPeer.parse(peerIdStr);
- newConf.addPeer(newPeer);
- }
-
- LOG.info("Configuration of replication group {} changed from
{} to {}.", groupId, oldConf, newConf);
+ recordConfigurationChange(groupId, resp.oldPeersList(),
resp.newPeersList());
return Status.OK();
}
else {
@@ -159,15 +173,11 @@ public class CliServiceImpl implements CliService {
Requires.requireTrue(!peer.isEmpty(), "Removing peer is blank");
final PeerId leaderId = new PeerId();
- final Status st = getLeader(groupId, conf, leaderId);
+ final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}
- if (!this.cliClientService.connect(leaderId)) {
- return new Status(-1, "Fail to init channel to leader %s",
leaderId);
- }
-
RemovePeerRequest req = cliOptions.getRaftMessagesFactory()
.removePeerRequest()
.groupId(groupId)
@@ -179,20 +189,7 @@ public class CliServiceImpl implements CliService {
final Message result = this.cliClientService.removePeer(leaderId,
req, null).get();
if (result instanceof RemovePeerResponse) {
final RemovePeerResponse resp = (RemovePeerResponse) result;
- final Configuration oldConf = new Configuration();
- for (final String peerIdStr : resp.oldPeersList()) {
- final PeerId oldPeer = new PeerId();
- oldPeer.parse(peerIdStr);
- oldConf.addPeer(oldPeer);
- }
- final Configuration newConf = new Configuration();
- for (final String peerIdStr : resp.newPeersList()) {
- final PeerId newPeer = new PeerId();
- newPeer.parse(peerIdStr);
- newConf.addPeer(newPeer);
- }
-
- LOG.info("Configuration of replication group {} changed from
{} to {}", groupId, oldConf, newConf);
+ recordConfigurationChange(groupId, resp.oldPeersList(),
resp.newPeersList());
return Status.OK();
}
else {
@@ -213,15 +210,11 @@ public class CliServiceImpl implements CliService {
Requires.requireNonNull(newPeers, "Null new peers");
final PeerId leaderId = new PeerId();
- final Status st = getLeader(groupId, conf, leaderId);
+ final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}
- if (!this.cliClientService.connect(leaderId)) {
- return new Status(-1, "Fail to init channel to leader %s",
leaderId);
- }
-
ChangePeersRequest req = cliOptions.getRaftMessagesFactory()
.changePeersRequest()
.groupId(groupId)
@@ -233,20 +226,7 @@ public class CliServiceImpl implements CliService {
final Message result = this.cliClientService.changePeers(leaderId,
req, null).get();
if (result instanceof ChangePeersResponse) {
final ChangePeersResponse resp = (ChangePeersResponse) result;
- final Configuration oldConf = new Configuration();
- for (final String peerIdStr : resp.oldPeersList()) {
- final PeerId oldPeer = new PeerId();
- oldPeer.parse(peerIdStr);
- oldConf.addPeer(oldPeer);
- }
- final Configuration newConf = new Configuration();
- for (final String peerIdStr : resp.newPeersList()) {
- final PeerId newPeer = new PeerId();
- newPeer.parse(peerIdStr);
- newConf.addPeer(newPeer);
- }
-
- LOG.info("Configuration of replication group {} changed from
{} to {}", groupId, oldConf, newConf);
+ recordConfigurationChange(groupId, resp.oldPeersList(),
resp.newPeersList());
return Status.OK();
}
else {
@@ -386,6 +366,15 @@ public class CliServiceImpl implements CliService {
}
}
+ @Override
+ public Status learner2Follower(final String groupId, final Configuration
conf, final PeerId learner) {
+ Status status = removeLearners(groupId, conf, Arrays.asList(learner));
+ if (status.isOk()) {
+ status = addPeer(groupId, conf, new
PeerId(learner.getConsistentId()));
+ }
+ return status;
+ }
+
@Override
public Status resetLearners(final String groupId, final Configuration
conf, final List<PeerId> learners) {
checkLearnersOpParams(groupId, conf, learners);
@@ -424,15 +413,11 @@ public class CliServiceImpl implements CliService {
Requires.requireNonNull(peer, "Null peer");
final PeerId leaderId = new PeerId();
- final Status st = getLeader(groupId, conf, leaderId);
+ final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}
- if (!this.cliClientService.connect(leaderId)) {
- return new Status(-1, "Fail to init channel to leader %s",
leaderId);
- }
-
TransferLeaderRequest rb = cliOptions.getRaftMessagesFactory()
.transferLeaderRequest()
.groupId(groupId)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index be0f8207d9..9456e17565 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -378,6 +378,7 @@ public class FSMCallerImpl implements FSMCaller {
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
+ task.reset();
}
else {
if (maxCommittedIndex >= 0) {
@@ -438,6 +439,7 @@ public class FSMCallerImpl implements FSMCaller {
}
finally {
this.nodeMetrics.recordLatency(task.type.metricName(),
Utils.monotonicMs() - startMs);
+ task.reset();
}
}
try {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4e2a3e5e14..4899a10ece 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -181,7 +181,6 @@ public class NodeImpl implements Node, RaftServerService {
private BallotBox ballotBox;
private SnapshotExecutor snapshotExecutor;
private ReplicatorGroup replicatorGroup;
- private final List<Closure> shutdownContinuations = new ArrayList<>();
private RaftClientService rpcClientService;
private ReadOnlyService readOnlyService;
@@ -286,7 +285,7 @@ public class NodeImpl implements Node, RaftServerService {
if (event.shutdownLatch != null) {
if (!this.tasks.isEmpty()) {
executeApplyingTasks(this.tasks);
- this.tasks.clear();
+ reset();
}
event.shutdownLatch.countDown();
return;
@@ -295,9 +294,16 @@ public class NodeImpl implements Node, RaftServerService {
this.tasks.add(event);
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch()
|| endOfBatch) {
executeApplyingTasks(this.tasks);
- this.tasks.clear();
+ reset();
}
}
+
+ private void reset() {
+ for (final LogEntryAndClosure task : tasks) {
+ task.reset();
+ }
+ this.tasks.clear();
+ }
}
/**
@@ -1557,18 +1563,21 @@ public class NodeImpl implements Node,
RaftServerService {
final Status st = new Status(RaftError.EPERM,
"expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, st);
+ task.reset();
}
continue;
}
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(),
task.done)) {
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new
Status(RaftError.EINTERNAL, "Fail to append task."));
+ task.reset();
continue;
}
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
entries.add(task.entry);
+ task.reset();
}
this.logManager.appendEntries(entries, new
LeaderStableClosure(entries));
// update conf.first
@@ -2625,12 +2634,8 @@ public class NodeImpl implements Node, RaftServerService
{
}
private void afterShutdown() {
- List<Closure> savedDoneList = null;
this.writeLock.lock();
try {
- if (!this.shutdownContinuations.isEmpty()) {
- savedDoneList = new ArrayList<>(this.shutdownContinuations);
- }
if (this.logStorage != null) {
this.logStorage.shutdown();
}
@@ -2639,11 +2644,6 @@ public class NodeImpl implements Node, RaftServerService
{
finally {
this.writeLock.unlock();
}
- if (savedDoneList != null) {
- for (final Closure closure : savedDoneList) {
-
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), closure);
- }
- }
}
@Override
@@ -2678,11 +2678,6 @@ public class NodeImpl implements Node, RaftServerService
{
}
}
- @Override
- public void shutdown() {
- shutdown(null);
- }
-
public void onConfigurationChangeDone(final long term) {
this.writeLock.lock();
try {
@@ -3001,7 +2996,7 @@ public class NodeImpl implements Node, RaftServerService {
}
@Override
- public void shutdown(final Closure done) {
+ public void shutdown() {
this.writeLock.lock();
try {
LOG.info("Node {} shutdown, currTerm={} state={}.", getNodeId(),
this.currTerm, this.state);
@@ -3047,20 +3042,6 @@ public class NodeImpl implements Node, RaftServerService
{
}));
}
}
-
- if (this.state != State.STATE_SHUTDOWN) {
- if (done != null) {
- this.shutdownContinuations.add(done);
- }
- return;
- }
-
- // This node is down, it's ok to invoke done right now. Don't
invoke this
- // in place to avoid the dead writeLock issue when done.Run() is
going to acquire
- // a writeLock which is already held by the caller
- if (done != null) {
-
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done);
- }
}
finally {
this.writeLock.unlock();
@@ -3775,6 +3756,11 @@ public class NodeImpl implements Node, RaftServerService
{
return this.targetPriority;
}
+ @Override
+ public State getNodeState() {
+ return this.state;
+ }
+
@Override
public void describe(final Printer out) {
// node
@@ -3835,8 +3821,8 @@ public class NodeImpl implements Node, RaftServerService {
this.ballotBox.describe(out);
// snapshotExecutor
- out.println("snapshotExecutor: ");
if (this.snapshotExecutor != null) {
+ out.println("snapshotExecutor: ");
this.snapshotExecutor.describe(out);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 0ceb11796b..e6aae9f426 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -97,6 +97,14 @@ public class ReadOnlyServiceImpl implements ReadOnlyService,
LastAppliedLogIndex
CountDownLatch shutdownLatch;
long startTime;
+ private void reset() {
+ this.nodeId = null;
+ this.requestContext = null;
+ this.done = null;
+ this.shutdownLatch = null;
+ this.startTime = 0L;
+ }
+
@Override
public NodeId nodeId() {
return nodeId;
@@ -120,7 +128,7 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
- this.events.clear();
+ reset();
newEvent.shutdownLatch.countDown();
return;
}
@@ -128,8 +136,15 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
this.events.add(newEvent);
if (this.events.size() >=
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
- this.events.clear();
+ reset();
+ }
+ }
+
+ private void reset() {
+ for (final ReadIndexEvent event : this.events) {
+ event.reset();
}
+ this.events.clear();
}
}
@@ -233,12 +248,14 @@ public class ReadOnlyServiceImpl implements
ReadOnlyService, LastAppliedLogIndex
private void resetPendingStatusError(final Status st) {
this.lock.lock();
try {
- for (final List<ReadIndexStatus> statuses :
this.pendingNotifyStatus.values()) {
+ final Iterator<List<ReadIndexStatus>> it =
this.pendingNotifyStatus.values().iterator();
+ while (it.hasNext()) {
+ final List<ReadIndexStatus> statuses = it.next();
for (final ReadIndexStatus status : statuses) {
reportError(status, st);
}
+ it.remove();
}
- this.pendingNotifyStatus.clear();
}
finally {
this.lock.unlock();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 68224939f3..baecbe8e36 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.core;
+import static com.codahale.metrics.MetricRegistry.name;
import static java.util.stream.Collectors.toList;
import com.codahale.metrics.Gauge;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.CatchUpClosure;
+import
org.apache.ignite.raft.jraft.core.Replicator.ReplicatorStateListener.ReplicatorState;
import org.apache.ignite.raft.jraft.entity.EntryMetaBuilder;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -86,6 +88,7 @@ public class Replicator implements ThreadId.OnError {
private long timeoutNowIndex;
private volatile long lastRpcSendTimestamp;
private volatile long heartbeatCounter = 0;
+ private volatile long probeCounter = 0;
private volatile long appendEntriesCounter = 0;
private volatile long installSnapshotCounter = 0;
protected Stat statInfo = new Stat();
@@ -122,6 +125,8 @@ public class Replicator implements ThreadId.OnError {
// Pending response queue;
private final PriorityQueue<RpcResponse> pendingResponses = new
PriorityQueue<>(50);
+ private final String metricName;
+
private int getAndIncrementReqSeq() {
final int prev = this.reqSeq;
this.reqSeq++;
@@ -141,9 +146,10 @@ public class Replicator implements ThreadId.OnError {
}
/**
- * Replicator state
+ * Replicator internal state
*/
public enum State {
+ Created,
Probe, // probe follower state
Snapshot, // installing snapshot to follower
Replicate, // replicate logs normally
@@ -158,6 +164,8 @@ public class Replicator implements ThreadId.OnError {
this.timerManager = replicatorOptions.getTimerManager();
this.raftOptions = raftOptions;
this.rpcService = replicatorOptions.getRaftRpcService();
+ this.metricName = getReplicatorMetricName(replicatorOptions);
+ setState(State.Created);
}
/**
@@ -180,6 +188,7 @@ public class Replicator implements ThreadId.OnError {
gauges.put("next-index", (Gauge<Long>) () -> this.r.nextIndex);
gauges.put("heartbeat-times", (Gauge<Long>) () ->
this.r.heartbeatCounter);
gauges.put("install-snapshot-times", (Gauge<Long>) () ->
this.r.installSnapshotCounter);
+ gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("append-entries-times", (Gauge<Long>) () ->
this.r.appendEntriesCounter);
return gauges;
}
@@ -198,7 +207,8 @@ public class Replicator implements ThreadId.OnError {
enum ReplicatorEvent {
CREATED, // created
ERROR, // error
- DESTROYED // destroyed
+ DESTROYED , // destroyed
+ STATE_CHANGED; // state changed.
}
/**
@@ -206,6 +216,27 @@ public class Replicator implements ThreadId.OnError {
* when replicator created, destroyed or had some errors.
*/
public interface ReplicatorStateListener {
+ /**
+ * Represents state changes in the replicator.
+ */
+ enum ReplicatorState {
+ /**
+ * The replicator is created.
+ */
+ CREATED,
+ /**
+ * The replicator is destroyed.
+ */
+ DESTROYED,
+ /**
+ * The replicator begins to do it's job (replicating logs or
installing snapshot).
+ */
+ ONLINE,
+ /**
+ * The replicator is suspended by raft error or lost connection.
+ */
+ OFFLINE
+ }
/**
* Called when this replicator has been created.
@@ -228,10 +259,22 @@ public class Replicator implements ThreadId.OnError {
* @param peer replicator related peerId
*/
void onDestroyed(final PeerId peer);
+
+ /**
+ * Called when the replicator state is changed. See {@link
ReplicatorState}
+ * @param peer the replicator's peer id.
+ * @param newState the new replicator state.
+ */
+ default void stateChanged(final PeerId peer, final ReplicatorState
newState) {}
+ }
+
+ private static void notifyReplicatorStatusListener(final Replicator
replicator, final ReplicatorEvent event,
+ final Status status) {
+ notifyReplicatorStatusListener(replicator, event, status, null);
}
/**
- * Notify replicator event(such as created, error, destroyed) to
replicatorStateListener which is implemented by
+ * Notify replicator event (such as created, error, destroyed) to
replicatorStateListener which is implemented by
* users.
*
* @param replicator replicator object
@@ -239,7 +282,7 @@ public class Replicator implements ThreadId.OnError {
* @param status replicator's error detailed status
*/
private static void notifyReplicatorStatusListener(final Replicator
replicator, final ReplicatorEvent event,
- final Status status) {
+ final Status status, final ReplicatorState newState) {
final ReplicatorOptions replicatorOpts =
Requires.requireNonNull(replicator.getOpts(), "replicatorOptions");
final Node node = Requires.requireNonNull(replicatorOpts.getNode(),
"node");
final PeerId peer =
Requires.requireNonNull(replicatorOpts.getPeerId(), "peer");
@@ -259,6 +302,8 @@ public class Replicator implements ThreadId.OnError {
case DESTROYED:
Utils.runInThread(replicatorOpts.getCommonExecutor(), () ->
listener.onDestroyed(peer));
break;
+ case STATE_CHANGED:
+
Utils.runInThread(replicatorOpts.getCommonExecutor(), () ->
listener.stateChanged(peer, newState));
default:
break;
}
@@ -387,14 +432,36 @@ public class Replicator implements ThreadId.OnError {
return this.inflights;
}
- @OnlyForTest
State getState() {
return this.state;
}
- @OnlyForTest
void setState(final State state) {
+ State oldState = this.state;
this.state = state;
+
+ if (oldState != state) {
+ ReplicatorState newState = null;
+ switch (state) {
+ case Created:
+ newState = ReplicatorState.CREATED;
+ break;
+ case Replicate:
+ case Snapshot:
+ newState = ReplicatorState.ONLINE;
+ break;
+ case Probe:
+ newState = ReplicatorState.OFFLINE;
+ break;
+ case Destroyed:
+ newState = ReplicatorState.DESTROYED;
+ break;
+ }
+
+ if (newState != null) {
+ notifyReplicatorStatusListener(this,
ReplicatorEvent.STATE_CHANGED, null, newState);
+ }
+ }
}
@OnlyForTest
@@ -481,7 +548,7 @@ public class Replicator implements ThreadId.OnError {
final int seq, final Future<Message> rpcInfly) {
this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq,
rpcInfly);
this.inflights.add(this.rpcInFly);
- this.nodeMetrics.recordSize("replicate-inflights-count",
this.inflights.size());
+ this.nodeMetrics.recordSize(name(this.metricName,
"replicate-inflights-count"), this.inflights.size());
}
/**
@@ -522,22 +589,27 @@ public class Replicator implements ThreadId.OnError {
}
void installSnapshot() {
- if (this.state == State.Snapshot) {
+ if (getState() == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new
request.", this.options.getPeerId());
- this.id.unlock();
+ unlockId();
return;
}
boolean doUnlock = true;
+ if (!rpcService.connect(options.getPeerId())) {
+ LOG.error("Fail to check install snapshot connection to peer={},
give up to send install snapshot request.", options.getPeerId());
+ block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
+ return;
+ }
try {
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is
%s", this.options.getPeerId(),
- this.state);
+ getState());
this.reader = this.options.getSnapshotStorage().open();
if (this.reader == null) {
final NodeImpl node = this.options.getNode();
final RaftException error = new
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to open
snapshot"));
- this.id.unlock();
+ unlockId();
doUnlock = false;
node.onError(error);
return;
@@ -548,7 +620,7 @@ public class Replicator implements ThreadId.OnError {
final RaftException error = new
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to generate
uri for snapshot reader"));
releaseReader();
- this.id.unlock();
+ unlockId();
doUnlock = false;
node.onError(error);
return;
@@ -560,7 +632,7 @@ public class Replicator implements ThreadId.OnError {
final RaftException error = new
RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to load meta
from %s", snapshotPath));
releaseReader();
- this.id.unlock();
+ unlockId();
doUnlock = false;
node.onError(error);
return;
@@ -580,7 +652,7 @@ public class Replicator implements ThreadId.OnError {
this.statInfo.lastLogIncluded = meta.lastIncludedIndex();
this.statInfo.lastTermIncluded = meta.lastIncludedTerm();
- this.state = State.Snapshot;
+ setState(State.Snapshot);
// noinspection NonAtomicOperationOnVolatileField
this.installSnapshotCounter++;
final long monotonicSendTimeMs = Utils.monotonicMs();
@@ -598,7 +670,7 @@ public class Replicator implements ThreadId.OnError {
}
finally {
if (doUnlock) {
- this.id.unlock();
+ unlockId();
}
}
}
@@ -643,7 +715,7 @@ public class Replicator implements ThreadId.OnError {
if (!success) {
//should reset states
r.resetInflights();
- r.state = State.Probe;
+ r.setState(State.Probe);
r.block(Utils.nowMs(), status.getCode());
return false;
}
@@ -653,7 +725,7 @@ public class Replicator implements ThreadId.OnError {
r.sendTimeoutNow(false, false);
}
// id is unlock in _send_entriesheartbeatCounter
- r.state = State.Replicate;
+ r.setState(State.Replicate);
return true;
}
@@ -714,8 +786,8 @@ public class Replicator implements ThreadId.OnError {
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
- this.appendEntriesCounter++;
- this.state = State.Probe;
+ this.probeCounter++;
+ setState(State.Probe);
final int stateVersion = this.version;
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture =
this.rpcService.appendEntries(this.options.getPeerId(),
@@ -733,7 +805,7 @@ public class Replicator implements ThreadId.OnError {
.getNodeId(), this.options.getPeerId(),
this.options.getTerm(), request.committedIndex());
}
finally {
- this.id.unlock();
+ unlockId();
}
}
@@ -798,9 +870,8 @@ public class Replicator implements ThreadId.OnError {
final MetricRegistry metricRegistry =
opts.getNode().getNodeMetrics().getMetricRegistry();
if (metricRegistry != null) {
try {
- final String replicatorMetricName =
getReplicatorMetricName(opts);
- if (!metricRegistry.getNames().contains(replicatorMetricName))
{
- metricRegistry.register(replicatorMetricName, new
ReplicatorMetricSet(opts, r));
+ if (!metricRegistry.getNames().contains(r.metricName)) {
+ metricRegistry.register(r.metricName, new
ReplicatorMetricSet(opts, r));
}
}
catch (final IllegalArgumentException e) {
@@ -817,11 +888,11 @@ public class Replicator implements ThreadId.OnError {
r.lastRpcSendTimestamp = Utils.monotonicMs();
r.startHeartbeatTimer(Utils.nowMs());
// id.unlock in sendEmptyEntries
- r.sendEmptyEntries(false);
+ r.sendProbeRequest();
return r.id;
}
- private static String getReplicatorMetricName(final ReplicatorOptions
opts) {
+ private String getReplicatorMetricName(final ReplicatorOptions opts) {
return "replicator-" + opts.getNode().getGroupId() + "/" +
opts.getPeerId();
}
@@ -853,7 +924,7 @@ public class Replicator implements ThreadId.OnError {
@Override
public String toString() {
- return "Replicator [state=" + this.state + ", statInfo=" +
this.statInfo + ", peerId="
+ return "Replicator [state=" + getState() + ", statInfo=" +
this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" +
this.options.getReplicatorType() + "]";
}
@@ -903,7 +974,7 @@ public class Replicator implements ThreadId.OnError {
// _next_index otherwise the replicator is likely waits in
executor.shutdown();
// _wait_more_entries and no further logs would be replicated even
if the
// last_index of this followers is less than |next_index - 1|
- r.sendEmptyEntries(false);
+ r.sendProbeRequest();
}
else if (errCode != RaftError.ESTOP.getNumber()) {
// id is unlock in _send_entries
@@ -929,7 +1000,7 @@ public class Replicator implements ThreadId.OnError {
// fine now.
if (this.blockTimer != null) {
// already in blocking state,return immediately.
- this.id.unlock();
+ unlockId();
return;
}
final long dueTime = startTimeMs +
this.options.getDynamicHeartBeatTimeoutMs();
@@ -938,13 +1009,13 @@ public class Replicator implements ThreadId.OnError {
this.blockTimer = this.timerManager.schedule(() ->
onBlockTimeout(this.id, this.options.getCommonExecutor()),
dueTime - Utils.nowMs(), TimeUnit.MILLISECONDS);
this.statInfo.runningState = RunningState.BLOCKING;
- this.id.unlock();
+ unlockId();
}
catch (final Exception e) {
this.blockTimer = null;
LOG.error("Fail to add timer", e);
// id unlock in sendEmptyEntries.
- sendEmptyEntries(false);
+ sendProbeRequest();
}
}
@@ -1066,9 +1137,9 @@ public class Replicator implements ThreadId.OnError {
// Unregister replicator metric set
if (this.nodeMetrics.isEnabled()) {
this.nodeMetrics.getMetricRegistry() //
-
.removeMatching(MetricFilter.startsWith(getReplicatorMetricName(this.options)));
+ .removeMatching(MetricFilter.startsWith(this.metricName));
}
- this.state = State.Destroyed;
+ setState(State.Destroyed);
notifyReplicatorStatusListener((Replicator) savedId.getData(),
ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
// Avoid nulling id because it's used to sync replicator state on
destroy.
@@ -1117,7 +1188,7 @@ public class Replicator implements ThreadId.OnError {
.append(status);
LOG.debug(sb.toString());
}
- r.state = State.Probe;
+ r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR,
status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {},
consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
@@ -1154,7 +1225,7 @@ public class Replicator implements ThreadId.OnError {
}
LOG.warn("Heartbeat to peer {} failure, try to send a probe
request.", r.options.getPeerId());
doUnlock = false;
- r.sendEmptyEntries(false);
+ r.sendProbeRequest();
r.startHeartbeatTimer(startTimeMs);
return;
}
@@ -1201,8 +1272,8 @@ public class Replicator implements ThreadId.OnError {
LOG.warn("Too many pending responses {} for replicator {},
maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(),
r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
- r.state = State.Probe;
- r.sendEmptyEntries(false);
+ r.setState(State.Probe);
+ r.sendProbeRequest();
return;
}
@@ -1255,7 +1326,7 @@ public class Replicator implements ThreadId.OnError {
"Replicator {} response sequence out of order, expect
{}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
- r.state = State.Probe;
+ r.setState(State.Probe);
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
@@ -1320,9 +1391,9 @@ public class Replicator implements ThreadId.OnError {
"Replicator {} received invalid AppendEntriesResponse,
in-flight startIndex={}, request prevLogIndex={}, reset the replicator state
and probe again.",
r, inflight.startIndex, request.prevLogIndex());
r.resetInflights();
- r.state = State.Probe;
+ r.setState(State.Probe);
// unlock id in sendEmptyEntries
- r.sendEmptyEntries(false);
+ r.sendProbeRequest();
return false;
}
// record metrics
@@ -1365,7 +1436,7 @@ public class Replicator implements ThreadId.OnError {
r.consecutiveErrorTimes, status);
}
r.resetInflights();
- r.state = State.Probe;
+ r.setState(State.Probe);
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
@@ -1416,7 +1487,7 @@ public class Replicator implements ThreadId.OnError {
}
}
// dummy_id is unlock in _send_heartbeat
- r.sendEmptyEntries(false);
+ r.sendProbeRequest();
return false;
}
if (isLogDebugEnabled) {
@@ -1426,7 +1497,7 @@ public class Replicator implements ThreadId.OnError {
// success
if (response.term() != r.options.getTerm()) {
r.resetInflights();
- r.state = State.Probe;
+ r.setState(State.Probe);
LOG.error("Fail, response term {} dismatch, expect term {}",
response.term(), r.options.getTerm());
id.unlock();
return false;
@@ -1446,7 +1517,7 @@ public class Replicator implements ThreadId.OnError {
}
}
- r.state = State.Replicate;
+ r.setState(State.Replicate);
r.blockTimer = null;
r.nextIndex += entriesSize;
r.hasSucceeded = true;
@@ -1497,7 +1568,7 @@ public class Replicator implements ThreadId.OnError {
this.statInfo.runningState = RunningState.IDLE;
}
finally {
- this.id.unlock();
+ unlockId();
}
}
@@ -1527,7 +1598,7 @@ public class Replicator implements ThreadId.OnError {
}
finally {
if (doUnlock) {
- this.id.unlock();
+ unlockId();
}
}
@@ -1606,6 +1677,7 @@ public class Replicator implements ThreadId.OnError {
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
+ this.appendEntriesCounter++;
Future<Message> rpcFuture = null;
try {
rpcFuture =
this.rpcService.appendEntries(this.options.getPeerId(), request, -1,
@@ -1647,6 +1719,10 @@ public class Replicator implements ThreadId.OnError {
r.sendEmptyEntries(true, closure);
}
+ private void sendProbeRequest() {
+ sendEmptyEntries(false);
+ }
+
private static void sendHeartbeat(final ThreadId id) {
final Replicator r = (Replicator) id.lock();
if (r == null) {
@@ -1683,7 +1759,7 @@ public class Replicator implements ThreadId.OnError {
}
finally {
if (unlockId) {
- this.id.unlock();
+ unlockId();
}
}
@@ -1793,7 +1869,7 @@ public class Replicator implements ThreadId.OnError {
// Register log_index so that _on_rpc_return trigger
// _send_timeout_now if _next_index reaches log_index
this.timeoutNowIndex = logIndex;
- this.id.unlock();
+ unlockId();
return true;
}
@@ -1830,4 +1906,11 @@ public class Replicator implements ThreadId.OnError {
return nextIdx;
}
+ private void unlockId() {
+ if (this.id == null) {
+ return;
+ }
+ this.id.unlock();
+ }
+
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
index 87d148e072..19c3519afc 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Checksum.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.entity;
+import java.util.Collection;
+
/**
* Checksum for entity.
*/
@@ -38,4 +40,20 @@ public interface Checksum {
default long checksum(final long v1, final long v2) {
return v1 ^ v2;
}
+
+ /**
+ * Returns the checksum value of act on factors.
+ *
+ * @param factors checksum collection
+ * @param v origin checksum
+ * @return checksum value
+ */
+ default long checksum(final Collection<? extends Checksum> factors, long
v) {
+ if (factors != null && !factors.isEmpty()) {
+ for (final Checksum factor : factors) {
+ v = checksum(v, factor.checksum());
+ }
+ }
+ return v;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
index e9b010c85d..8022e24bd1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
@@ -17,7 +17,6 @@
package org.apache.ignite.raft.jraft.entity;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.List;
import org.apache.ignite.raft.jraft.util.CrcUtil;
@@ -25,6 +24,8 @@ import org.apache.ignite.raft.jraft.util.CrcUtil;
* A replica log entry.
*/
public class LogEntry implements Checksum {
+ public static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
+
/** entry type */
private EnumOutter.EntryType type;
/** log id with index/term */
@@ -38,7 +39,7 @@ public class LogEntry implements Checksum {
/** log entry old learners */
private List<PeerId> oldLearners;
/** entry data */
- private ByteBuffer data;
+ private ByteBuffer data = EMPTY_DATA;
/** checksum for log entry */
private long checksum;
/** true when the log has checksum **/
@@ -77,25 +78,16 @@ public class LogEntry implements Checksum {
@Override
public long checksum() {
long c = checksum(this.type.getNumber(), this.id.checksum());
- c = checksumPeers(this.peers, c);
- c = checksumPeers(this.oldPeers, c);
- c = checksumPeers(this.learners, c);
- c = checksumPeers(this.oldLearners, c);
+ c = checksum(this.peers, c);
+ c = checksum(this.oldPeers, c);
+ c = checksum(this.learners, c);
+ c = checksum(this.oldLearners, c);
if (this.data != null && this.data.hasRemaining()) {
c = checksum(c, CrcUtil.crc64(this.data));
}
return c;
}
- private long checksumPeers(final Collection<PeerId> peers, long c) {
- if (peers != null && !peers.isEmpty()) {
- for (final PeerId peer : peers) {
- c = checksum(c, peer.checksum());
- }
- }
- return c;
- }
-
/**
* Returns whether the log entry has a checksum.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
index 89663a005e..9574acc381 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/Task.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.closure.JoinableClosure;
+import org.apache.ignite.raft.jraft.util.Requires;
/**
* Basic message structure of jraft, contains:
@@ -39,7 +40,7 @@ public class Task implements Serializable {
private static final long serialVersionUID = 2971309899898274575L;
/** Associated task data */
- private ByteBuffer data;
+ private ByteBuffer data = LogEntry.EMPTY_DATA;;
/** task closure, called when the data is successfully committed to the
raft group or failures happen. */
private Closure done;
/**
@@ -55,7 +56,7 @@ public class Task implements Serializable {
/**
* Creates a task with data/done.
*/
- public Task(ByteBuffer data, Closure done) {
+ public Task(final ByteBuffer data, final Closure done) {
super();
this.data = data;
this.done = done;
@@ -64,7 +65,7 @@ public class Task implements Serializable {
/**
* Creates a task with data/done/expectedTerm.
*/
- public Task(ByteBuffer data, Closure done, long expectedTerm) {
+ public Task(final ByteBuffer data, final Closure done, final long
expectedTerm) {
super();
this.data = data;
this.done = done;
@@ -75,7 +76,8 @@ public class Task implements Serializable {
return this.data;
}
- public void setData(ByteBuffer data) {
+ public void setData(final ByteBuffer data) {
+ Requires.requireNonNull(data, "data should not be null, you can use
LogEntry.EMPTY_DATA instead.");
this.data = data;
}
@@ -83,7 +85,7 @@ public class Task implements Serializable {
return this.done;
}
- public void setDone(Closure done) {
+ public void setDone(final Closure done) {
this.done = done;
}
@@ -91,7 +93,7 @@ public class Task implements Serializable {
return this.expectedTerm;
}
- public void setExpectedTerm(long expectedTerm) {
+ public void setExpectedTerm(final long expectedTerm) {
this.expectedTerm = expectedTerm;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
index 720260b0a4..a854fc34b3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/InvokeContext.java
@@ -31,6 +31,10 @@ public class InvokeContext {
return this.ctx.put(key, value);
}
+ public Object putIfAbsent(final String key, final Object value) {
+ return this.ctx.putIfAbsent(key, value);
+ }
+
public <T> T get(final String key) {
return (T) this.ctx.get(key);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
index a954cf6453..9f476c1413 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java
@@ -67,13 +67,23 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
@Override
public Future<Message> preVote(final PeerId peerId, final
RequestVoteRequest request,
final RpcResponseClosure<RequestVoteResponse> done) {
- return invokeWithDone(peerId, request, done,
this.nodeOptions.getElectionTimeoutMs());
+
+ if (connect(peerId)) {
+ return invokeWithDone(peerId, request, done,
this.nodeOptions.getElectionTimeoutMs());
+ }
+
+ return onConnectionFail(rpcExecutor, request, done, peerId);
}
@Override
public Future<Message> requestVote(final PeerId peerId, final
RequestVoteRequest request,
final RpcResponseClosure<RequestVoteResponse> done) {
- return invokeWithDone(peerId, request, done,
this.nodeOptions.getElectionTimeoutMs());
+
+ if (connect(peerId)) {
+ return invokeWithDone(peerId, request, done,
this.nodeOptions.getElectionTimeoutMs());
+ }
+
+ return onConnectionFail(rpcExecutor, request, done, peerId);
}
@Override
@@ -88,7 +98,7 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
return invokeWithDone(peerId, request, done, timeoutMs, executor);
}
- return failedFuture(executor, request, done, peerId);
+ return onConnectionFail(executor, request, done, peerId);
}
@Override
@@ -109,7 +119,7 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
return invokeWithDone(peerId, request, done,
this.rpcOptions.getRpcInstallSnapshotTimeout());
}
- return failedFuture(rpcExecutor, request, done, peerId);
+ return onConnectionFail(rpcExecutor, request, done, peerId);
}
@Override
@@ -131,22 +141,22 @@ public class DefaultRaftClientService extends
AbstractClientService implements R
* @param peerId The Peer ID.
* @return The future.
*/
- private Future<Message> failedFuture(Executor executor, Message request,
RpcResponseClosure<?> done, PeerId peerId) {
+ private Future<Message> onConnectionFail(Executor executor, Message
request, RpcResponseClosure<?> done, PeerId peerId) {
// fail-fast when no connection
final CompletableFuture<Message> future = new CompletableFuture<>();
executor.execute(() -> {
+ final String fmt = "Check connection[%s] fail and try to create
new one";
if (done != null) {
try {
- done.run(new Status(RaftError.EINTERNAL, "Check
connection[%s] fail and try to create new one", peerId));
+ done.run(new Status(RaftError.EINTERNAL, fmt, peerId));
}
catch (final Throwable t) {
LOG.error("Fail to run RpcResponseClosure, the request is
{}.", t, request);
}
}
- future.completeExceptionally(new RemotingException("Check
connection[" +
- peerId + "] fail and try to create new one"));
+ future.completeExceptionally(new
RemotingException(String.format(fmt, peerId)));
});
return future;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 2b898dfc52..906b573fa7 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -510,9 +510,13 @@ public class LogManagerImpl implements LogManager {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
LogManagerImpl.this.shutDownLatch.countDown();
+ event.reset();
return;
}
final StableClosure done = event.done;
+ final EventType eventType = event.type;
+
+ event.reset();
if (done.getEntries() != null && !done.getEntries().isEmpty()) {
this.ab.append(done);
@@ -520,7 +524,7 @@ public class LogManagerImpl implements LogManager {
else {
this.lastId = this.ab.flush();
boolean ret = true;
- switch (event.type) {
+ switch (eventType) {
case LAST_LOG_ID:
((LastLogIdClosure)
done).setLastLogId(this.lastId.copy());
break;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
index 6838439205..97f08d6a46 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
@@ -201,7 +201,7 @@ public class LocalSnapshotStorage implements
SnapshotStorage {
}
}
catch (final IOException e) {
- LOG.error("Fail to sync writer {}.", writer.getPath());
+ LOG.error("Fail to sync writer {}.", writer.getPath(), e);
ret = RaftError.EIO.getNumber();
ioe = e;
break;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
index cf36af4436..fe4d756d1c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java
@@ -109,6 +109,10 @@ public class CopySession implements Session {
if (!this.finished) {
Utils.closeQuietly(this.outputStream);
}
+ if (null != this.destBuf) {
+ this.destBuf.recycle();
+ this.destBuf = null;
+ }
}
finally {
this.lock.unlock();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
index 1911f9091a..44d02912a5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SegmentList.java
@@ -21,11 +21,11 @@ import java.util.Collection;
import java.util.function.Predicate;
/**
- * A list implementation based on segments.Only supports removing elements
from start or end. The list keep the elements
+ * A list implementation based on segments. Only supports removing elements
from start or end. The list keep the elements
* in a segment list, every segment contains at most 128 elements.
*
* [segment, segment, segment ...] / | \
segment segment
- * segment [0, 1 ... 127] [128, 129 ... 255] [256, 1 ... 383]
+ * segment [0, 1 ... 127] [128, 129 ... 255] [256, 257 ... 383]
*/
public class SegmentList<T> {
private static final int SEGMENT_SHIFT = 7;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
index ae168aefb0..220e331fc1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/StorageOptionsFactory.java
@@ -113,11 +113,20 @@ public final class StorageOptionsFactory {
// If true, missing column families will be automatically created.
opts.setCreateMissingColumnFamilies(true);
- // Number of open files that can be used by the DB. You may need to
increase
+ // Number of open files that can be used by the DB. You may need to
increase
// this if your database has a large working set. Value -1 means files
opened
// are always kept open.
opts.setMaxOpenFiles(-1);
+ // To limit the num of LOG. Once LOG exceed this num, RocksDB will
delete old LOG
+ // automatically.
+ opts.setKeepLogFileNum(100);
+
+ // To limit the size of WALs. Once WALs exceed this size, RocksDB will
start
+ // forcing the flush of column families to allow deletion of some
oldest WALs.
+ // We make it 1G as default.
+ opts.setMaxTotalWalSize(1 << 30);
+
// The maximum number of concurrent background compactions. The
default is 1,
// but to fully utilize your CPU and storage you might want to
increase this
// to approximately number of cores in the system.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 6b1c208763..0a27cc4894 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -23,8 +23,11 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
@@ -368,6 +371,41 @@ public final class Utils {
}
}
+ /**
+ * Unmap mappedByteBuffer
+ * See
https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java
+ */
+ public static void unmap(final MappedByteBuffer cb) {
+ // JavaSpecVer: 1.6, 1.7, 1.8, 9, 10
+ final boolean isOldJDK =
System.getProperty("java.specification.version", "99").startsWith("1.");
+ try {
+ if (isOldJDK) {
+ final Method cleaner = cb.getClass().getMethod("cleaner");
+ cleaner.setAccessible(true);
+ final Method clean =
Class.forName("sun.misc.Cleaner").getMethod("clean");
+ clean.setAccessible(true);
+ clean.invoke(cleaner.invoke(cb));
+ } else {
+ Class unsafeClass;
+ try {
+ unsafeClass = Class.forName("sun.misc.Unsafe");
+ } catch (final Exception ex) {
+ // jdk.internal.misc.Unsafe doesn't yet have an
invokeCleaner() method,
+ // but that method should be added if sun.misc.Unsafe is
removed.
+ unsafeClass = Class.forName("jdk.internal.misc.Unsafe");
+ }
+ final Method clean = unsafeClass.getMethod("invokeCleaner",
ByteBuffer.class);
+ clean.setAccessible(true);
+ final Field theUnsafeField =
unsafeClass.getDeclaredField("theUnsafe");
+ theUnsafeField.setAccessible(true);
+ final Object theUnsafe = theUnsafeField.get(null);
+ clean.invoke(theUnsafe, cb);
+ }
+ } catch (final Exception ex) {
+ LOG.error("Fail to un-mapped segment file.", ex);
+ }
+ }
+
public static String getString(final byte[] bs, final int off, final int
len) {
return new String(bs, off, len, StandardCharsets.UTF_8);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index 40bda369e7..5399bfda20 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -184,10 +184,10 @@ public class ReplicatorTest {
assertNotNull(r);
assertSame(r.getOpts(), this.opts);
Set<String> metrics =
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
- assertEquals(6, metrics.size());
+ assertEquals(7, metrics.size());
r.destroy();
metrics =
this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
- assertEquals(1, metrics.size());
+ assertEquals(0, metrics.size());
}
private Replicator getReplicator() {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
index cab5683a92..4b7b5e70e3 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LogEntryTest {
@@ -36,7 +37,7 @@ public class LogEntryTest {
LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
entry.setId(new LogId(100, 3));
entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new
PeerId("localhost", 100, 2)));
- assertNull(entry.getData());
+ assertSame(LogEntry.EMPTY_DATA, entry.getData());
assertNull(entry.getOldPeers());
DefaultLogEntryCodecFactory factory =
DefaultLogEntryCodecFactory.getInstance();
@@ -55,7 +56,7 @@ public class LogEntryTest {
assertEquals(2, nentry.getPeers().size());
assertEquals("localhost:99:1", nentry.getPeers().get(0).toString());
assertEquals("localhost:100:2", nentry.getPeers().get(1).toString());
- assertNull(nentry.getData());
+ assertSame(LogEntry.EMPTY_DATA, entry.getData());
assertNull(nentry.getOldPeers());
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
index cf190abdf6..6e067c4ceb 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java
@@ -30,6 +30,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -47,6 +48,24 @@ public abstract class BaseLogEntryCodecFactoryTest {
protected abstract LogEntryCodecFactory newFactory();
+ @Test
+ public void testEmptyData() {
+ LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setId(new LogId(100, 3));
+ entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new
PeerId("localhost", 100, 2)));
+ entry.setData(ByteBuffer.allocate(0));
+
+ byte[] content = this.encoder.encode(entry);
+
+ assertNotNull(content);
+ assertTrue(content.length > 0);
+
+ LogEntry nentry = this.decoder.decode(content);
+ assertNotNull(nentry);
+ assertNotNull(nentry.getData());
+ assertEquals(0, nentry.getData().remaining());
+ }
+
@Test
public void testEncodeDecodeEmpty() {
try {
@@ -65,7 +84,7 @@ public abstract class BaseLogEntryCodecFactoryTest {
LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
entry.setId(new LogId(100, 3));
entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new
PeerId("localhost", 100, 2)));
- assertNull(entry.getData());
+ assertSame(LogEntry.EMPTY_DATA, entry.getData());
assertNull(entry.getOldPeers());
byte[] content = this.encoder.encode(entry);
@@ -82,7 +101,7 @@ public abstract class BaseLogEntryCodecFactoryTest {
assertEquals(2, nentry.getPeers().size());
assertEquals("localhost:99:1", nentry.getPeers().get(0).toString());
assertEquals("localhost:100:2", nentry.getPeers().get(1).toString());
- assertNull(nentry.getData());
+ assertSame(LogEntry.EMPTY_DATA, nentry.getData());
assertNull(nentry.getOldPeers());
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 3eea7b73c1..f1699b41ab 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -247,9 +247,11 @@ public class ItRaftCommandLeftInLogUntilRestartTest
extends AbstractBasicIntegra
return;
}
+ long idx = event.committedIndex;
+
handler.onEvent(event, sequence, endOfBatch);
- appliedIndex.set(event.committedIndex);
+ appliedIndex.set(idx);
}, exceptionHandler);
}
});