This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new 4c2d92e fix tests
4c2d92e is described below
commit 4c2d92ee5b1a1326e04052966bd8a117a202e978
Author: jt <[email protected]>
AuthorDate: Wed Jan 5 18:27:37 2022 +0800
fix tests
---
.../apache/iotdb/cluster/log/LogDispatcher.java | 2 ++
.../iotdb/cluster/partition/PartitionTable.java | 7 ++++
.../cluster/partition/slot/SlotPartitionTable.java | 2 +-
.../cluster/server/member/MetaGroupMember.java | 4 +--
.../iotdb/cluster/server/member/RaftMember.java | 2 ++
.../cluster/server/service/DataGroupEngine.java | 16 +++++----
.../cluster/server/service/MetaAsyncService.java | 9 +++--
.../server/heartbeat/MetaHeartbeatThreadTest.java | 5 +++
.../iotdb/cluster/server/member/BaseMember.java | 10 ++++--
.../cluster/server/member/MetaGroupMemberTest.java | 39 ++++++++++++++++------
10 files changed, 70 insertions(+), 26 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2626d69..8392289 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -428,6 +428,8 @@ public class LogDispatcher {
try {
long operationStartTime =
Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
for (int i = 0; i < retries; i++) {
+
logRequest.getVotingLog().getFailedNodeIds().remove(receiver.nodeIdentifier);
+
logRequest.getVotingLog().getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
AppendEntryResult result =
syncClient.appendEntry(logRequest.appendEntryRequest);
if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
Thread.sleep(100);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 2767d4e..caecf06 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -93,6 +93,13 @@ public interface PartitionTable {
*/
PartitionGroup getPartitionGroup(RaftNode headerNode);
+ /**
+ * find replicationNum groups that a node is in
+ * @param node
+ * @return
+ */
+ List<PartitionGroup> getPartitionGroups(Node node);
+
ByteBuffer serialize();
/**
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 60c532a..d0bbc73 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -189,7 +189,7 @@ public class SlotPartitionTable implements PartitionTable {
}
// find replicationNum groups that a node is in
- private List<PartitionGroup> getPartitionGroups(Node node) {
+ public List<PartitionGroup> getPartitionGroups(Node node) {
List<PartitionGroup> ret = new ArrayList<>();
int nodeIndex = findNodeIndex(node, nodeRing);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index fc2ddc0..e312319 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -740,7 +740,7 @@ public class MetaGroupMember extends RaftMember implements
IService, MetaGroupMe
logger.info("Starting sub-servers...");
synchronized (partitionTable) {
try {
- getDataGroupEngine().buildDataGroupMembers(partitionTable);
+ getDataGroupEngine().buildDataGroupMembers(thisNode, partitionTable);
sendHandshake();
} catch (Exception e) {
logger.error("Build partition table failed: ", e);
@@ -1701,7 +1701,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
// update DataGroupMembers, as the node is removed, the members of some
groups are
// changed and there will also be one less group
NodeRemovalResult result = partitionTable.getNodeRemovalResult();
- getDataGroupEngine().removeNode(oldNode, result);
+ getDataGroupEngine().removeNode(oldNode, thisNode, result);
// the leader is removed, start the next election ASAP
if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ea3b463..66fee6b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -2005,6 +2005,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
AtomicLong newLeaderTerm = new AtomicLong(term.get());
AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
+ log.getFailedNodeIds().clear();
+ log.getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
try {
if (allNodes.size() > 2) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 6409acd..6f68acb 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -71,7 +71,7 @@ public class DataGroupEngine implements IService,
DataGroupEngineMBean {
private PartitionTable partitionTable;
private DataGroupMember.Factory dataMemberFactory;
private static MetaGroupMember metaGroupMember;
- private final Node thisNode = ClusterIoTDB.getInstance().getThisNode();
+ private Node thisNode = ClusterIoTDB.getInstance().getThisNode();
private static TProtocolFactory protocolFactory;
private DataGroupEngine() {
@@ -368,7 +368,8 @@ public class DataGroupEngine implements IService,
DataGroupEngineMBean {
* group which the local node is in) and start them.
*/
@SuppressWarnings("java:S1135")
- public void buildDataGroupMembers(PartitionTable partitionTable) {
+ public void buildDataGroupMembers(Node thisNode,
+ PartitionTable partitionTable) {
setPartitionTable(partitionTable);
// TODO-Cluster: if there are unchanged members, do not stop and restart
them
// clear previous members if the partition table is reloaded
@@ -380,7 +381,7 @@ public class DataGroupEngine implements IService,
DataGroupEngineMBean {
value.setUnchanged(false);
}
- List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups();
+ List<PartitionGroup> partitionGroups =
partitionTable.getPartitionGroups(thisNode);
for (PartitionGroup partitionGroup : partitionGroups) {
RaftNode header = partitionGroup.getHeader();
DataGroupMember prevMember = headerGroupMap.get(header);
@@ -426,17 +427,18 @@ public class DataGroupEngine implements IService,
DataGroupEngineMBean {
* group, set the member to read only so that it can still provide data for
other nodes that has
* not yet pulled its data. Otherwise, just change the node list of the
member and pull new data.
* And create a new DataGroupMember if this node should join a new group
because of this removal.
- *
- * @param node
+ * @param node
+ * @param thisNode
* @param removalResult cluster changes due to the node removal
*/
- public void removeNode(Node node, NodeRemovalResult removalResult) {
+ public void removeNode(Node node, Node thisNode,
+ NodeRemovalResult removalResult) {
Iterator<Entry<RaftNode, DataGroupMember>> entryIterator =
headerGroupMap.entrySet().iterator();
synchronized (headerGroupMap) {
while (entryIterator.hasNext()) {
Entry<RaftNode, DataGroupMember> entry = entryIterator.next();
DataGroupMember dataGroupMember = entry.getValue();
- if (dataGroupMember.getHeader().getNode().equals(node) ||
node.equals(thisNode)) {
+ if (dataGroupMember.getHeader().getNode().equals(node) ||
node.equals(this.thisNode)) {
entryIterator.remove();
removeMember(
entry.getKey(), dataGroupMember,
dataGroupMember.getHeader().getNode().equals(node));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 878528b..ff29dc4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -50,6 +50,7 @@ import java.nio.ByteBuffer;
import java.util.List;
public class MetaAsyncService extends BaseAsyncService implements
TSMetaService.AsyncIface {
+
private static final String ERROR_MSG_META_NOT_READY = "The metadata not is
not ready.";
private static final Logger logger =
LoggerFactory.getLogger(MetaAsyncService.class);
@@ -61,7 +62,8 @@ public class MetaAsyncService extends BaseAsyncService
implements TSMetaService.
}
@Override
- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback
resultHandler) {
+ public void appendEntry(AppendEntryRequest request,
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
// if the metaGroupMember is not ready (e.g., as a follower the
PartitionTable is loaded
// locally, but the partition table is not verified), we do not handle the
RPC requests.
if (!metaGroupMember.isReady() && metaGroupMember.getPartitionTable() ==
null) {
@@ -71,7 +73,8 @@ public class MetaAsyncService extends BaseAsyncService
implements TSMetaService.
// ready.
// this node lacks information of the cluster and refuse to work
logger.debug("This node is blind to the cluster and cannot accept logs");
- resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
+ resultHandler.onComplete(
+ new
AppendEntryResult().setStatus(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE));
return;
}
@@ -214,7 +217,7 @@ public class MetaAsyncService extends BaseAsyncService
implements TSMetaService.
/**
* Forward a node removal request to the leader.
*
- * @param node the node to be removed
+ * @param node the node to be removed
* @param resultHandler
* @return true if the request is successfully forwarded, false otherwise
*/
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 00b3f83..aea1eab 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -101,6 +101,11 @@ public class MetaHeartbeatThreadTest extends
HeartbeatThreadTest {
}
@Override
+ public List<PartitionGroup> getPartitionGroups(Node node) {
+ return null;
+ }
+
+ @Override
public boolean deserialize(ByteBuffer buffer) {
return true;
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index f897701..c585022 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.cluster.server.member;
+import static
org.apache.iotdb.cluster.server.member.MetaGroupMember.NODE_IDENTIFIER_FILE_NAME;
+import static
org.apache.iotdb.cluster.server.member.MetaGroupMember.PARTITION_FILE_NAME;
+
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.client.ClientManager;
@@ -203,8 +206,8 @@ public class BaseMember {
RegisterManager.setDeregisterTimeOut(100);
EnvironmentUtils.cleanEnv();
ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(prevUrls);
- new File(MetaGroupMember.PARTITION_FILE_NAME).delete();
- new File(MetaGroupMember.NODE_IDENTIFIER_FILE_NAME).delete();
+ new File(PARTITION_FILE_NAME).delete();
+ new File(NODE_IDENTIFIER_FILE_NAME).delete();
RaftMember.setWaitLeaderTimeMs(prevLeaderWait);
testThreadPool.shutdownNow();
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(prevUseAsyncServer);
@@ -215,6 +218,9 @@ public class BaseMember {
ClusterConstant.setSyncLeaderMaxWaitMs(syncLeaderMaxWait);
ClusterConstant.setHeartbeatIntervalMs(heartBeatInterval);
ClusterConstant.setElectionTimeoutMs(electionTimeout);
+
+ new File(PARTITION_FILE_NAME).delete();
+ new File(NODE_IDENTIFIER_FILE_NAME).delete();
}
DataGroupMember getDataGroupMember(Node node) {
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 99aa6d3..ac2996b 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -144,6 +144,7 @@ import static
org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -189,7 +190,7 @@ public class MetaGroupMemberTest extends BaseMember {
new DataGroupMember.Factory(new Factory(), testMetaMember) {
@Override
public DataGroupMember create(Node thisNode, PartitionGroup
partitionGroup) {
- return getDataGroupMember(partitionGroup,
TestUtils.getNode(0));
+ return getDataGroupMember(partitionGroup, thisNode);
}
},
testMetaMember);
@@ -1026,7 +1027,7 @@ public class MetaGroupMemberTest extends BaseMember {
}
@Test
- public void testProcessValidHeartbeatReq() throws QueryProcessException {
+ public void testProcessValidHeartbeatReq() {
System.out.println("Start testProcessValidHeartbeatReq()");
MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
@@ -1041,9 +1042,23 @@ public class MetaGroupMemberTest extends BaseMember {
request.setRegenerateIdentifier(true);
testMetaMember.setPartitionTable(null);
testMetaMember.processValidHeartbeatReq(request, response);
- assertTrue(response.getFollowerIdentifier() != 10);
+ assertNotEquals(10, response.getFollowerIdentifier());
assertTrue(response.isRequirePartitionTable());
+ } finally {
+ testMetaMember.stop();
+ }
+ }
+ @Test
+ public void testProcessValidHeartbeatReq2() {
+ System.out.println("Start testProcessValidHeartbeatReq()");
+ MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
+ partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+ testMetaMember.setCoordinator(new Coordinator());
+ try {
+ HeartBeatRequest request = new HeartBeatRequest();
+ HeartBeatResponse response = new HeartBeatResponse();
+ request.setRegenerateIdentifier(false);
request.setPartitionTableBytes(partitionTable.serialize());
testMetaMember.processValidHeartbeatReq(request, response);
assertEquals(partitionTable, testMetaMember.getPartitionTable());
@@ -1055,7 +1070,7 @@ public class MetaGroupMemberTest extends BaseMember {
@Test
public void testProcessValidHeartbeatResp() throws QueryProcessException {
System.out.println("Start testProcessValidHeartbeatResp()");
- MetaGroupMember metaGroupMember = getMetaGroupMember(TestUtils.getNode(9));
+ MetaGroupMember metaGroupMember =
getMetaGroupMember(TestUtils.getNode(10));
metaGroupMember.start();
metaGroupMember.onElectionWins();
try {
@@ -1088,20 +1103,21 @@ public class MetaGroupMemberTest extends BaseMember {
request.setLeaderCommit(0);
request.setPrevLogIndex(-1);
request.setPrevLogTerm(-1);
+ request.setIsFromLeader(true);
request.setLeader(new Node("127.0.0.1", 30000, 0, 40000,
Constants.RPC_PORT, "127.0.0.1"));
- AtomicReference<Long> result = new AtomicReference<>();
- GenericHandler<Long> handler = new GenericHandler<>(TestUtils.getNode(0),
result);
+ AtomicReference<AppendEntryResult> result = new AtomicReference<>();
+ GenericHandler<AppendEntryResult> handler = new
GenericHandler<>(TestUtils.getNode(0), result);
testMetaMember.setPartitionTable(null);
testMetaMember.setReady(false);
new MetaAsyncService(testMetaMember).appendEntry(request, handler);
- assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE, (long)
result.get());
+ assertEquals(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE,
result.get().status);
System.out.println("Term after first append: " +
testMetaMember.getTerm().get());
testMetaMember.setPartitionTable(partitionTable);
testMetaMember.setReady(true);
new MetaAsyncService(testMetaMember).appendEntry(request, handler);
System.out.println("Term after second append: " +
testMetaMember.getTerm().get());
- assertEquals(Response.RESPONSE_AGREE, (long) result.get());
+ assertEquals(Response.RESPONSE_STRONG_ACCEPT, result.get().status);
}
@Test
@@ -1249,10 +1265,11 @@ public class MetaGroupMemberTest extends BaseMember {
System.out.println("Start testLoadIdentifier()");
try (RandomAccessFile raf =
new RandomAccessFile(MetaGroupMember.NODE_IDENTIFIER_FILE_NAME, "rw"))
{
- raf.writeBytes("100");
+ raf.writeBytes("50");
}
- MetaGroupMember metaGroupMember = getMetaGroupMember(new Node());
- assertEquals(100, metaGroupMember.getThisNode().getNodeIdentifier());
+ MetaGroupMember metaGroupMember =
+ getMetaGroupMember(TestUtils.getNode(50).setNodeIdentifier(0));
+ assertEquals(50, metaGroupMember.getThisNode().getNodeIdentifier());
metaGroupMember.stop();
}