This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this
push:
new 89e8f1f fix tests
89e8f1f is described below
commit 89e8f1f759e518a6dba2d4d83687528508102982
Author: jt2594838 <[email protected]>
AuthorDate: Tue Feb 11 16:24:16 2020 +0800
fix tests
---
cluster/partitions.tmp | Bin 511 -> 0 bytes
.../iotdb/cluster/partition/PartitionGroup.java | 5 +++
.../iotdb/cluster/partition/PartitionTable.java | 3 +-
.../handlers/caller/AppendGroupEntryHandler.java | 5 +--
.../cluster/server/member/MetaGroupMember.java | 4 +--
.../caller/AppendGroupEntryHandlerTest.java | 19 +++++++++++-
.../cluster/server/member/DataGroupMemberTest.java | 34 ++++++++++++++-------
.../cluster/server/member/MetaGroupMemberTest.java | 6 ----
.../org/apache/iotdb/db/engine/StorageEngine.java | 6 ++--
.../iotdb/db/exception/StorageEngineException.java | 1 +
.../StorageGroupProcessorException.java | 1 +
11 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/cluster/partitions.tmp b/cluster/partitions.tmp
deleted file mode 100644
index 569a578..0000000
Binary files a/cluster/partitions.tmp and /dev/null differ
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index cccbe25..2a140f7 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.partition;
import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.iotdb.cluster.rpc.thrift.Node;
/**
@@ -34,6 +35,10 @@ public class PartitionGroup extends ArrayList<Node> {
public PartitionGroup() {
}
+ public PartitionGroup(Node... nodes) {
+ this.addAll(Arrays.asList(nodes));
+ }
+
public PartitionGroup(PartitionGroup other) {
super(other);
this.thisNode = other.thisNode;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index bb2fb26..a6d94d5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory;
public interface PartitionTable {
// static final is not necessary, it is redundant for an interface
Logger logger = LoggerFactory.getLogger(SlotPartitionTable.class);
- long PARTITION_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ long PARTITION_INTERVAL = StorageEngine.getTimePartitionInterval();
/**
* Given the storageGroupName and the timestamp, return the list of nodes on
which the storage
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
index 07b8c76..e19a03b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.cluster.server.handlers.caller;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
-import static
org.apache.iotdb.cluster.server.member.MetaGroupMember.REPLICATION_NUM;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -52,6 +52,7 @@ public class AppendGroupEntryHandler implements
AsyncMethodCallback<Long> {
// store the flag of leadership lost and the new leader's term
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
+ private int replicationNum =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
public AppendGroupEntryHandler(int[] groupReceivedCounter, int
receiverNodeIndex,
Node receiverNode, AtomicBoolean leaderShipStale, Log log, AtomicLong
newLeaderTerm) {
@@ -94,7 +95,7 @@ public class AppendGroupEntryHandler implements
AsyncMethodCallback<Long> {
logger.debug("Node {} has accepted log {}", receiverNode, log);
// this node is contained in REPLICATION_NUM groups, decrease the
counters of these groups
int startIndex = receiverNodeIndex;
- for (int i = 0; i < REPLICATION_NUM; i++) {
+ for (int i = 0; i < replicationNum; i++) {
int nodeIndex = receiverNodeIndex - i;
if (nodeIndex < 0) {
nodeIndex += groupReceivedCounter.length;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 57c63a7..ebc606b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -136,7 +136,7 @@ public class MetaGroupMember extends RaftMember implements
TSMetaService.AsyncIf
private static final Logger logger =
LoggerFactory.getLogger(MetaGroupMember.class);
private static final int DEFAULT_JOIN_RETRY = 10;
private static final int REPORT_INTERVAL_SEC = 10;
- public static final int REPLICATION_NUM =
+ public final int REPLICATION_NUM =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
// blind nodes are nodes that does not know the nodes in the cluster
@@ -885,7 +885,7 @@ public class MetaGroupMember extends RaftMember implements
TSMetaService.AsyncIf
return null;
} else if (PartitionUtils.isGlobalPlan(plan)) {// forward the plan to all
nodes
return processNonPartitionedPlan(plan);
- } else { //split the plan and forward them to some ParititonGroups
+ } else { //split the plan and forward them to some PartitionGroups
try {
return processPartitionedPlan(plan);
} catch (UnsupportedPlanException e) {
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
index fc6adef..a1b70ad 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server.handlers.caller;
-import static
org.apache.iotdb.cluster.server.member.MetaGroupMember.REPLICATION_NUM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -29,12 +28,30 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.common.TestException;
import org.apache.iotdb.cluster.common.TestLog;
import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.server.Response;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class AppendGroupEntryHandlerTest {
+ private int REPLICATION_NUM;
+ private int prevReplicationNum;
+
+ @Before
+ public void setUp() {
+ prevReplicationNum =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
+ ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(2);
+ REPLICATION_NUM =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
+ }
+
+ @After
+ public void tearDown() {
+
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(prevReplicationNum);
+ }
+
@Test
public void testAgreement() throws InterruptedException {
int[] groupReceivedCounter = new int[10];
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 52472c8..fbc276e 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.cluster.common.TestDataClient;
import org.apache.iotdb.cluster.common.TestException;
import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.ReaderNotFoundException;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.applier.DataLogApplier;
@@ -109,9 +110,12 @@ public class DataGroupMemberTest extends MemberTest {
private boolean hasInitialSnapshots;
private boolean enableSyncLeader;
private int numSlotsToPull = 2;
+ private int prevReplicationNum;
@Before
public void setUp() throws Exception {
+ prevReplicationNum =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
+ ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(3);
super.setUp();
dataGroupMember = getDataGroupMember(TestUtils.getNode(0));
snapshotMap = new HashMap<>();
@@ -124,6 +128,12 @@ public class DataGroupMemberTest extends MemberTest {
pulledSnapshots = new HashSet<>();
}
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(prevReplicationNum);
+ }
+
private PartitionedSnapshotLogManager getLogManager(PartitionGroup
partitionGroup) {
return new TestPartitionedLogManager(new DataLogApplier(testMetaMember),
testMetaMember.getPartitionTable(), partitionGroup.getHeader(),
FileSnapshot::new) {
@@ -147,6 +157,10 @@ public class DataGroupMemberTest extends MemberTest {
private DataGroupMember getDataGroupMember(Node node) throws IOException {
PartitionGroup nodes = partitionTable.getHeaderGroup(node);
+ return getDataGroupMember(node, nodes);
+ }
+
+ private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes)
throws IOException {
return new DataGroupMember(new TCompactProtocol.Factory(), nodes, node,
getLogManager(nodes),
testMetaMember, new TAsyncClientManager()) {
@Override
@@ -253,11 +267,6 @@ public class DataGroupMemberTest extends MemberTest {
};
}
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
@Test
public void testGetHeader() {
assertEquals(TestUtils.getNode(0), dataGroupMember.getHeader());
@@ -265,9 +274,12 @@ public class DataGroupMemberTest extends MemberTest {
@Test
public void testAddNode() throws IOException {
- DataGroupMember firstMember = dataGroupMember;
- DataGroupMember midMember = getDataGroupMember(TestUtils.getNode(50));
- DataGroupMember lastMember = getDataGroupMember(TestUtils.getNode(90));
+ PartitionGroup partitionGroup = new PartitionGroup(TestUtils.getNode(0),
+ TestUtils.getNode(50), TestUtils.getNode(90));
+ DataGroupMember firstMember = getDataGroupMember(TestUtils.getNode(0),
+ new PartitionGroup(partitionGroup));
+ DataGroupMember midMember = getDataGroupMember(TestUtils.getNode(50), new
PartitionGroup(partitionGroup));
+ DataGroupMember lastMember = getDataGroupMember(TestUtils.getNode(90), new
PartitionGroup(partitionGroup));
Node newNodeBeforeGroup = TestUtils.getNode(-5);
assertFalse(firstMember.addNode(newNodeBeforeGroup));
@@ -418,7 +430,7 @@ public class DataGroupMemberTest extends MemberTest {
hasInitialSnapshots = true;
dataGroupMember.setCharacter(NodeCharacter.LEADER);
PullSnapshotRequest request = new PullSnapshotRequest();
- List<Integer> requiredSlots = Arrays.asList(1, 3, 5, 7, 9, 11);
+ List<Integer> requiredSlots = Arrays.asList(1, 3, 5, 7, 9, 11, 101);
request.setRequiredSlots(requiredSlots);
AtomicReference<Map<Integer, FileSnapshot>> reference = new
AtomicReference<>();
PullSnapshotHandler<FileSnapshot> handler = new
PullSnapshotHandler<>(reference,
@@ -439,9 +451,9 @@ public class DataGroupMemberTest extends MemberTest {
dataGroupMember.start();
try {
hasInitialSnapshots = false;
- partitionTable.addNode(TestUtils.getNode(10));
+ partitionTable.addNode(TestUtils.getNode(100));
List<Integer> requiredSlots = Arrays.asList(19, 39, 59, 79, 99);
- dataGroupMember.pullNodeAdditionSnapshots(requiredSlots,
TestUtils.getNode(10));
+ dataGroupMember.pullNodeAdditionSnapshots(requiredSlots,
TestUtils.getNode(100));
assertEquals(requiredSlots.size(), receivedSnapshots.size());
for (Integer requiredSlot : requiredSlots) {
receivedSnapshots.get(requiredSlot).getRemoteSnapshot();
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 0e2bd42..f96af5c 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -412,12 +412,6 @@ public class MetaGroupMemberTest extends MemberTest {
}
}
- @Override
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
@Test
public void testClosePartition() throws QueryProcessException,
StorageEngineException {
// the operation is accepted
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 255b16d..a6b058b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -92,10 +92,12 @@ public class StorageEngine implements IService {
private static final ExecutorService recoveryThreadPool =
IoTDBThreadPoolFactory
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
"Recovery-Thread-Pool");
- private static final StorageEngine INSTANCE = new StorageEngine();
+ static class InstanceHolder {
+ private static final StorageEngine INSTANCE = new StorageEngine();
+ }
public static StorageEngine getInstance() {
- return INSTANCE;
+ return InstanceHolder.INSTANCE;
}
private ScheduledExecutorService ttlCheckThread;
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
index cb1b264..4ab8e50 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java
@@ -35,6 +35,7 @@ public class StorageEngineException extends ProcessException {
public StorageEngineException(ProcessException e) {
super(e.getMessage());
+ initCause(e);
errorCode = e.getErrorCode();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/storageGroup/StorageGroupProcessorException.java
b/server/src/main/java/org/apache/iotdb/db/exception/storageGroup/StorageGroupProcessorException.java
index 08b9572..075a0f1 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/storageGroup/StorageGroupProcessorException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/storageGroup/StorageGroupProcessorException.java
@@ -27,6 +27,7 @@ public class StorageGroupProcessorException extends
ProcessException {
public StorageGroupProcessorException(Exception exception) {
super(exception.getMessage());
+ initCause(exception);
errorCode = TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode();
}