This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_add_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_add_peer by this push:
new 593b62e166 complete the part for MultiLeaderServerImpl to add new Peer
sync channel
593b62e166 is described below
commit 593b62e166b75c5a3617c0c48a9cd6f89cd9c75d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Aug 31 17:57:43 2022 +0800
complete the part for MultiLeaderServerImpl to add new Peer sync channel
---
.../multileader/MultiLeaderConsensus.java | 12 +++
.../multileader/MultiLeaderServerImpl.java | 10 +++
.../multileader/logdispatcher/IndexController.java | 9 ++-
.../multileader/logdispatcher/LogDispatcher.java | 90 ++++++++++++++++------
.../service/MultiLeaderRPCServiceProcessor.java | 7 ++
.../logdispatcher/IndexControllerTest.java | 8 +-
.../multileader/logdispatcher/SyncStatusTest.java | 8 +-
.../src/main/thrift/mutlileader.thrift | 10 +++
8 files changed, 119 insertions(+), 35 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 33e59e361b..033303442b 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -230,6 +230,18 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer)
{
+ MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ // step 1: inactive new Peer to prepare for following steps
+ impl.inactivePeer(peer);
+
+ // step 2: notify all the other Peers to build the sync connection to
newPeer
+ impl.notifyPeersToBuildSyncLogChannel(peer);
+
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index c5e88aab16..9fc28bc826 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -183,6 +183,16 @@ public class MultiLeaderServerImpl {
stateMachine.loadSnapshot(latestSnapshotRootDir);
}
+ public void inactivePeer(Peer peer) {
+ // TODO: (xingtanzjr) investigate how to implement here smoothly using
sync/async client
+ }
+
+ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) {}
+
+ public void buildSyncLogChannel(Peer targetPeer) {
+
+ }
+
public void persistConfiguration() {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 62cc0e5894..26f2a9c002 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -47,10 +47,12 @@ public class IndexController {
private final String storageDir;
private final String prefix;
+ private final long initialIndex;
- public IndexController(String storageDir, String prefix) {
+ public IndexController(String storageDir, String prefix, long initialIndex) {
this.storageDir = storageDir;
this.prefix = prefix + '-';
+ this.initialIndex = initialIndex;
restore();
}
@@ -137,10 +139,13 @@ public class IndexController {
}
currentIndex = lastFlushedIndex;
} else {
- versionFile = new File(directory, prefix + "0");
+ currentIndex = initialIndex;
+ versionFile = new File(directory, prefix + initialIndex);
try {
Files.createFile(versionFile.toPath());
} catch (IOException e) {
+ // TODO: (xingtanzjr) we need to handle the situation that file
creation failed.
+ // Or the dispatcher won't run correctly
logger.error("Error occurred when creating new file {}",
versionFile.getAbsolutePath(), e);
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e3539752be..97cee9c3c2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -55,14 +55,15 @@ import java.util.stream.Collectors;
/** Manage all asynchronous replication threads and corresponding async
clients */
public class LogDispatcher {
-
private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-
+ private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
private final MultiLeaderServerImpl impl;
private final List<LogDispatcherThread> threads;
private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient>
clientManager;
private ExecutorService executorService;
+ private boolean stopped = false;
+
public LogDispatcher(
MultiLeaderServerImpl impl,
IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
@@ -71,22 +72,26 @@ public class LogDispatcher {
this.threads =
impl.getConfiguration().stream()
.filter(x -> !Objects.equals(x, impl.getThisNode()))
- .map(x -> new LogDispatcherThread(x, impl.getConfig()))
+ .map(x -> new LogDispatcherThread(x, impl.getConfig(),
DEFAULT_INITIAL_SYNC_INDEX))
.collect(Collectors.toList());
if (!threads.isEmpty()) {
+ // We use cached thread pool here because each LogDispatcherThread will
occupy one thread.
+ // And every LogDispatcherThread won't release its thread in this pool
because it won't stop
+ // unless LogDispatcher stop.
+ // Thus, the size of this threadPool will be the same as the count of
LogDispatcherThread.
this.executorService =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- threads.size(), "LogDispatcher-" +
impl.getThisNode().getGroupId());
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-" + impl.getThisNode().getGroupId());
}
}
- public void start() {
+ public synchronized void start() {
if (!threads.isEmpty()) {
threads.forEach(executorService::submit);
}
}
- public void stop() {
+ public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
@@ -100,31 +105,64 @@ public class LogDispatcher {
logger.error("Unexpected Interruption when closing LogDispatcher
service ");
}
}
+ stopped = true;
}
- public OptionalLong getMinSyncIndex() {
+ public synchronized void addLogDispatcherThread(Peer peer, long
initialIndex) {
+ if (stopped) {
+ return;
+ }
+ //
+ LogDispatcherThread thread = new LogDispatcherThread(peer,
impl.getConfig(), initialIndex);
+ threads.add(thread);
+ executorService.submit(thread);
+ }
+
+ public synchronized void removeLogDispatcherThread(Peer peer) {
+ if (stopped) {
+ return;
+ }
+ int threadIndex = -1;
+ for (int i = 0; i < threads.size(); i++) {
+ if
(threads.get(i).peer.getEndpoint().getIp().equals(peer.getEndpoint().getIp())) {
+ threadIndex = i;
+ break;
+ }
+ }
+ if (threadIndex == -1) {
+ return;
+ }
+ threads.get(threadIndex).stop();
+ threads.remove(threadIndex);
+ }
+
+ public synchronized OptionalLong getMinSyncIndex() {
return
threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
}
public void offer(IndexedConsensusRequest request) {
List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
- threads.forEach(
- thread -> {
- logger.debug(
- "{}->{}: Push a log to the queue, where the queue length is {}",
- impl.getThisNode().getGroupId(),
- thread.getPeer().getEndpoint().getIp(),
- thread.getPendingRequest().size());
- if (!thread
- .getPendingRequest()
- .offer(new IndexedConsensusRequest(serializedRequests,
request.getSearchIndex()))) {
+ // we put the serialization step outside the synchronized block because it
is stateless and
+ // time-consuming
+ synchronized (this) {
+ threads.forEach(
+ thread -> {
logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node,
searchIndex: {}",
+ "{}->{}: Push a log to the queue, where the queue length is
{}",
impl.getThisNode().getGroupId(),
- thread.getPeer(),
- request.getSearchIndex());
- }
- });
+ thread.getPeer().getEndpoint().getIp(),
+ thread.getPendingRequest().size());
+ if (!thread
+ .getPendingRequest()
+ .offer(new IndexedConsensusRequest(serializedRequests,
request.getSearchIndex()))) {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node,
searchIndex: {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer(),
+ request.getSearchIndex());
+ }
+ });
+ }
}
public class LogDispatcherThread implements Runnable {
@@ -146,14 +184,16 @@ public class LogDispatcher {
private ConsensusReqReader.ReqIterator walEntryiterator;
private long iteratorIndex = 1;
- public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
+ public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long
initialSyncIndex) {
this.peer = peer;
this.config = config;
this.pendingRequest =
new
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
this.controller =
new IndexController(
- impl.getStorageDir(),
Utils.fromTEndPointToString(peer.getEndpoint()));
+ impl.getStorageDir(),
+ Utils.fromTEndPointToString(peer.getEndpoint()),
+ initialSyncIndex);
this.syncStatus = new SyncStatus(controller, config);
this.walEntryiterator = reader.getReqIterator(iteratorIndex);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 201d04ab35..ec8e3f5789 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
@@ -165,5 +167,10 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
new TActivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
}
+ @Override
+ public void buildSyncLogChannel(
+ TBuildSyncLogChannelReq req,
AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+ throws TException {}
+
public void handleClientExit() {}
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index aa6f63783f..1f2c228f11 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -46,7 +46,7 @@ public class IndexControllerTest {
/** test indexController when incrementIntervalAfterRestart == false */
@Test
public void testIncrementIntervalAfterRestart() {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -55,7 +55,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1,
controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -63,7 +63,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1,
controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
@@ -71,7 +71,7 @@ public class IndexControllerTest {
Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1,
controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+ controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getCurrentIndex());
Assert.assertEquals(IndexController.FLUSH_INTERVAL,
controller.getLastFlushedIndex());
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index db6b03377a..4f94aafeaa 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -54,7 +54,7 @@ public class SyncStatusTest {
/** Confirm success from front to back */
@Test
public void sequenceTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +79,7 @@ public class SyncStatusTest {
/** Confirm success from back to front */
@Test
public void reverseTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -111,7 +111,7 @@ public class SyncStatusTest {
/** Confirm success first from front to back, then back to front */
@Test
public void mixedTest() throws InterruptedException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -155,7 +155,7 @@ public class SyncStatusTest {
/** Test Blocking while addNextBatch */
@Test
public void waitTest() throws InterruptedException, ExecutionException {
- IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix);
+ IndexController controller = new
IndexController(storageDir.getAbsolutePath(), prefix, 0);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2b9d7ac587..993eb40ba7 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -51,8 +51,18 @@ struct TSyncLogRes {
1: required list<common.TSStatus> status
}
+struct TBuildSyncLogChannelReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+ 2: required common.TEndPoint endPoint
+}
+
+struct TBuildSyncLogChannelRes {
+ 1: required list<common.TSStatus> status
+}
+
service MultiLeaderConsensusIService {
TSyncLogRes syncLog(TSyncLogReq req)
TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
TActivatePeerRes activatePeer(TActivatePeerReq req)
+ TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
}
\ No newline at end of file