This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch expr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 438cd8cdebdbf3390795c089bf53656d178a7cef Author: jt <[email protected]> AuthorDate: Thu May 27 17:17:41 2021 +0800 implement server interfaces --- .../iotdb/cluster/server/DataClusterServer.java | 27 ++++++++++++++++++++++ .../iotdb/cluster/server/MetaClusterServer.java | 25 ++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index d88d1a3..69469b0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; @@ -963,4 +964,30 @@ public class DataClusterServer extends RaftServer getDataAsyncService(thisNode, resultHandler, hardLinkPath) .removeHardLink(hardLinkPath, resultHandler); } + + @Override + public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers) + throws TException { + return getDataSyncService(thisNode) + .appendEntryIndirect(request, subReceivers); + } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) { + getDataSyncService(thisNode).acknowledgeAppendEntry(ack); + } + + @Override + public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers, + AsyncMethodCallback<Long> resultHandler) { + getDataAsyncService(thisNode, resultHandler, request) + .appendEntryIndirect(request, subReceivers, resultHandler); + } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack, + AsyncMethodCallback<Void> resultHandler) { + getDataAsyncService(thisNode, resultHandler, ack) + .acknowledgeAppendEntry(ack, resultHandler); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index ddc64cc..f18d920 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.cluster.server; +import java.util.List; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.coordinator.Coordinator; import org.apache.iotdb.cluster.exception.ConfigInconsistentException; @@ -26,6 +27,7 @@ import org.apache.iotdb.cluster.metadata.CMManager; import org.apache.iotdb.cluster.metadata.MetaPuller; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; @@ -370,4 +372,27 @@ public class MetaClusterServer extends RaftServer public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) { asyncService.handshake(sender, resultHandler); } + + @Override + public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers) + throws TException { + return syncService.appendEntryIndirect(request, subReceivers); + } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) { + syncService.acknowledgeAppendEntry(ack); + } + + @Override + public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers, + AsyncMethodCallback<Long> resultHandler) { + asyncService.appendEntryIndirect(request, subReceivers, resultHandler); + } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack, + AsyncMethodCallback<Void> resultHandler) { + asyncService.acknowledgeAppendEntry(ack, resultHandler); + } }
