This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch expr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4aa5448b1eceb0e499266678d3677787e0d9a010 Author: jt <[email protected]> AuthorDate: Tue May 25 09:40:24 2021 +0800 temp save --- cluster/distribute.sh | 19 ++ cluster/src/assembly/resources/sbin/expr-bench.sh | 73 ++++++ cluster/src/assembly/resources/sbin/expr-server.sh | 73 ++++++ .../apache/iotdb/cluster/config/ClusterConfig.java | 10 + .../org/apache/iotdb/cluster/expr/ExprBench.java | 103 ++++++++ .../ExprLogDispatcher.java} | 217 ++++------------- .../org/apache/iotdb/cluster/expr/ExprMember.java | 151 ++++++++++++ .../org/apache/iotdb/cluster/expr/ExprServer.java | 90 +++++++ .../iotdb/cluster/log/IndirectLogDispatcher.java | 103 ++++++++ .../apache/iotdb/cluster/log/LogDispatcher.java | 35 ++- .../iotdb/cluster/log/manage/RaftLogManager.java | 33 +++ .../log/manage/UnCommittedEntryManager.java | 12 +- .../serializable/SyncLogDequeSerializer.java | 2 +- .../iotdb/cluster/server/DataClusterServer.java | 8 +- .../iotdb/cluster/server/MetaClusterServer.java | 24 +- .../apache/iotdb/cluster/server/RaftServer.java | 14 +- .../server/handlers/caller/HeartbeatHandler.java | 2 +- .../handlers/forwarder/IndirectAppendHandler.java | 49 ++++ .../cluster/server/heartbeat/HeartbeatThread.java | 3 +- .../server/heartbeat/MetaHeartbeatServer.java | 4 +- .../server/heartbeat/MetaHeartbeatThread.java | 10 +- .../cluster/server/member/MetaGroupMember.java | 26 +- .../iotdb/cluster/server/member/RaftMember.java | 264 ++++++++++++++++----- .../cluster/server/service/BaseAsyncService.java | 19 ++ .../cluster/server/service/BaseSyncService.java | 52 ++-- .../apache/iotdb/cluster/utils/ClientUtils.java | 2 + .../apache/iotdb/cluster/utils/ClusterNode.java | 2 +- .../apache/iotdb/cluster/utils/ClusterUtils.java | 12 + pom.xml | 52 ++-- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 + .../org/apache/iotdb/db/qp/logical/Operator.java | 1 + .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 7 +- .../apache/iotdb/db/qp/physical/sys/ExprPlan.java | 84 +++++++ thrift-cluster/src/main/thrift/cluster.thrift | 21 ++ 34 files changed, 1239 insertions(+), 340 deletions(-) diff --git a/cluster/distribute.sh b/cluster/distribute.sh new file mode 100644 index 0000000..98758da --- /dev/null +++ b/cluster/distribute.sh @@ -0,0 +1,19 @@ +src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb* + +ips=(fit36 fit38 fit39) +target_lib_path=/data/iotdb_expr/lib/ + +for ip in ${ips[*]} + do + ssh fit@$ip "mkdir $target_lib_path" + scp -r $src_lib_path fit@$ip:$target_lib_path + done + +ips=(fit31 fit32 fit33 fit34) +target_lib_path=/disk/iotdb_expr/lib/ + +for ip in ${ips[*]} + do + ssh fit@$ip "mkdir $target_lib_path" + scp -r $src_lib_path fit@$ip:$target_lib_path + done \ No newline at end of file diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh new file mode 100644 index 0000000..d4fa092 --- /dev/null +++ b/cluster/src/assembly/resources/sbin/expr-bench.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +echo --------------------- +echo "Starting IoTDB (Cluster Mode)" +echo --------------------- + +if [ -z "${IOTDB_HOME}" ]; then + export IOTDB_HOME="`dirname "$0"`/.." +fi + +enable_printgc=false +if [ "$#" -ge "1" -a "$1" == "printgc" ]; then + enable_printgc=true; + shift +fi + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +if [ -z $JAVA ] ; then + echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr + exit 1; +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done +classname=org.apache.iotdb.cluster.expr.ExprBench + +launch_service() +{ + iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml" + iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}" + iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}" + iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}" + iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}" + iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB" + exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@ + return $? +} + +# Start up the service +launch_service "$classname" $@ + +exit $? diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh new file mode 100644 index 0000000..f323d0b --- /dev/null +++ b/cluster/src/assembly/resources/sbin/expr-server.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +echo --------------------- +echo "Starting IoTDB (Cluster Mode)" +echo --------------------- + +if [ -z "${IOTDB_HOME}" ]; then + export IOTDB_HOME="`dirname "$0"`/.." +fi + +enable_printgc=false +if [ "$#" -ge "1" -a "$1" == "printgc" ]; then + enable_printgc=true; + shift +fi + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +if [ -z $JAVA ] ; then + echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr + exit 1; +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done +classname=org.apache.iotdb.cluster.expr.ExprServer + +launch_service() +{ + iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml" + iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}" + iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}" + iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}" + iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}" + iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB" + exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@ + return $? +} + +# Start up the service +launch_service "$classname" $@ + +exit $? diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 7d3e42a..540201c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -168,6 +168,8 @@ public class ClusterConfig { private boolean openServerRpcPort = false; + private boolean useIndirectBroadcasting = false; + /** * create a clusterConfig class. The internalIP will be set according to the server's hostname. If * there is something error for getting the ip of the hostname, then set the internalIp as @@ -183,6 +185,14 @@ public class ClusterConfig { seedNodeUrls = Arrays.asList(String.format("%s:%d", internalIp, internalMetaPort)); } + public boolean isUseIndirectBroadcasting() { + return useIndirectBroadcasting; + } + + public void setUseIndirectBroadcasting(boolean useIndirectBroadcasting) { + this.useIndirectBroadcasting = useIndirectBroadcasting; + } + public int getSelectorNumOfClientPool() { return selectorNumOfClientPool; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java new file mode 100644 index 0000000..b625f2c --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.expr; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.cluster.client.sync.SyncClientFactory; +import org.apache.iotdb.cluster.client.sync.SyncClientPool; +import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync; +import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; +import org.apache.iotdb.db.qp.physical.sys.ExprPlan; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol.Factory; + +public class ExprBench { + + private AtomicLong requestCounter = new AtomicLong(); + private int threadNum = 64; + private int workloadSize = 64 * 1024; + private SyncClientPool clientPool; + private Node target; + private int maxRequestNum; + + public ExprBench(Node target) { + this.target = target; + SyncClientFactory factory = new FactorySync(new Factory()); + clientPool = new SyncClientPool(factory); + } + + public void benchmark() { + long startTime = System.currentTimeMillis(); + for (int i = 0; i < threadNum; i++) { + new Thread( + () -> { + Client client = clientPool.getClient(target); + ExecutNonQueryReq request = new ExecutNonQueryReq(); + ExprPlan plan = new ExprPlan(); + plan.setWorkload(new byte[workloadSize]); + plan.setNeedForward(true); + ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096); + plan.serialize(byteBuffer); + byteBuffer.flip(); + request.setPlanBytes(byteBuffer); + long currRequsetNum = -1; + while (true) { + + try { + client.executeNonQueryPlan(request); + currRequsetNum = requestCounter.incrementAndGet(); + } catch (TException e) { + e.printStackTrace(); + } + + if (currRequsetNum % 1000 == 0) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println(String.format("%d %d %f(%f)", elapsedTime, + currRequsetNum, + (currRequsetNum + 0.0) / elapsedTime, + currRequsetNum * workloadSize / (1024.0*1024.0) / elapsedTime)); + } + + if (currRequsetNum >= maxRequestNum) { + break; + } + } + }) + .start(); + } + } + + public void setMaxRequestNum(int maxRequestNum) { + this.maxRequestNum = maxRequestNum; + } + + public static void main(String[] args) { + Node target = new Node(); + target.setInternalIp(args[0]); + target.setMetaPort(Integer.parseInt(args[1])); + ExprBench bench = new ExprBench(target); + bench.maxRequestNum = Integer.parseInt(args[2]); + bench.threadNum = Integer.parseInt(args[3]); + bench.benchmark(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java similarity index 63% copy from cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java index a600c4a..4b302a7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java @@ -17,9 +17,27 @@ * under the License. */ -package org.apache.iotdb.cluster.log; +package org.apache.iotdb.cluster.expr; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.cluster.client.sync.SyncClientPool; +import org.apache.iotdb.cluster.client.sync.SyncMetaClient; +import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync; import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.exception.UnknownLogTypeException; +import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; @@ -33,26 +51,13 @@ import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.utils.TestOnly; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - /** * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its * followers and send the logs in an ordered manner so that the followers will not wait for previous @@ -60,27 +65,25 @@ import java.util.concurrent.atomic.AtomicLong; * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3 * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly. */ -public class LogDispatcher { +public class ExprLogDispatcher { - private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class); - private RaftMember member; - private boolean useBatchInLogCatchUp = - ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp(); + private static final Logger logger = LoggerFactory.getLogger(ExprLogDispatcher.class); private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>(); private ExecutorService executorService; private static ExecutorService serializationService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build()); + private SyncClientPool clientPool; + private Node leader; - public LogDispatcher(RaftMember member) { - this.member = member; + public ExprLogDispatcher(List<Node> nodes, Node leader) { executorService = Executors.newCachedThreadPool(); - for (Node node : member.getAllNodes()) { - if (!node.equals(member.getThisNode())) { - nodeLogQueues.add(createQueueAndBindingThread(node)); - } + for (Node node : nodes) { + nodeLogQueues.add(createQueueAndBindingThread(node)); } + clientPool = new SyncClientPool(new FactorySync(new Factory())); + this.leader = leader; } @TestOnly @@ -114,15 +117,9 @@ public class LogDispatcher { addSucceeded = nodeLogQueue.add(log); } - if (!addSucceeded) { - logger.debug( - "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName()); - } else { + if (addSucceeded) { log.setEnqueueTime(System.nanoTime()); } - } catch (IllegalStateException e) { - logger.debug( - "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -219,30 +216,26 @@ public class LogDispatcher { class DispatcherThread implements Runnable { - private Node receiver; + private Node node; + private Client client; private BlockingQueue<SendLogRequest> logBlockingDeque; private List<SendLogRequest> currBatch = new ArrayList<>(); - private Peer peer; - DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) { - this.receiver = receiver; + DispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingDeque) { + this.client = clientPool.getClient(node); this.logBlockingDeque = logBlockingDeque; - this.peer = - member - .getPeerMap() - .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex())); } @Override public void run() { - Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver); + Thread.currentThread().setName("LogDispatcher-" + node); try { while (!Thread.interrupted()) { SendLogRequest poll = logBlockingDeque.take(); currBatch.add(poll); logBlockingDeque.drainTo(currBatch); if (logger.isDebugEnabled()) { - logger.debug("Sending {} logs to {}", currBatch.size(), receiver); + logger.debug("Sending {} logs to {}", currBatch.size(), node); } for (SendLogRequest request : currBatch) { request.getAppendEntryRequest().entry = request.serializedLogFuture.get(); @@ -258,59 +251,25 @@ public class LogDispatcher { logger.info("Dispatcher exits"); } - private void appendEntriesAsync( - List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) - throws TException { - AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch); - AsyncClient client = member.getSendLogAsyncClient(receiver); - if (logger.isDebugEnabled()) { - logger.debug( - "{}: append entries {} with {} logs", member.getName(), receiver, logList.size()); - } - if (client != null) { - client.appendEntries(request, handler); - } - } - private void appendEntriesSync( List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) { long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime(); - if (!member.waitForPrevLog(peer, currBatch.get(0).getLog())) { - logger.warn( - "{}: node {} timed out when appending {}", - member.getName(), - receiver, - currBatch.get(0).getLog()); - return; - } Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime); - Client client = member.getSyncClient(receiver); - if (client == null) { - logger.error("No available client for {}", receiver); - return; - } - AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch); startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime(); try { long result = client.appendEntries(request); Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime); if (result != -1 && logger.isInfoEnabled()) { logger.info( - "{}: Append {} logs to {}, resp: {}", - member.getName(), + "Append {} logs to {}, resp: {}", logList.size(), - receiver, + node, result); } - handler.onComplete(result); } catch (TException e) { - client.getInputProtocol().getTransport().close(); - handler.onError(e); logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1); - } finally { - ClientUtils.putBackSyncClient(client); } } @@ -318,15 +277,8 @@ public class LogDispatcher { List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) { AppendEntriesRequest request = new AppendEntriesRequest(); - if (member.getHeader() != null) { - request.setHeader(member.getHeader()); - } - request.setLeader(member.getThisNode()); - request.setLeaderCommit(member.getLogManager().getCommitLogIndex()); - - synchronized (member.getTerm()) { - request.setTerm(member.getTerm().get()); - } + request.setLeader(leader); + request.setTerm(1); request.setEntries(logList); // set index for raft @@ -339,7 +291,7 @@ public class LogDispatcher { return request; } - private void sendLogs(List<SendLogRequest> currBatch) throws TException { + private void sendLogs(List<SendLogRequest> currBatch) { int logIndex = 0; logger.debug( "send logs from index {} to {}", @@ -362,11 +314,7 @@ public class LogDispatcher { } AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex); - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { - appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex)); - } else { - appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex)); - } + appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex)); for (; prevIndex < logIndex; prevIndex++) { Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart( currBatch.get(prevIndex).getLog().getCreateTime()); @@ -374,84 +322,9 @@ public class LogDispatcher { } } - private void sendBatchLogs(List<SendLogRequest> currBatch) throws TException { - if (currBatch.size() > 1) { - if (useBatchInLogCatchUp) { - sendLogs(currBatch); - } else { - for (SendLogRequest batch : currBatch) { - sendLog(batch); - } - } - } else { - sendLog(currBatch.get(0)); - } - } - - private void sendLog(SendLogRequest logRequest) { - Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart( - logRequest.getLog().getCreateTime()); - member.sendLogToFollower( - logRequest.getLog(), - logRequest.getVoteCounter(), - receiver, - logRequest.getLeaderShipStale(), - logRequest.getNewLeaderTerm(), - logRequest.getAppendEntryRequest()); - Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart( - logRequest.getLog().getCreateTime()); + private void sendBatchLogs(List<SendLogRequest> currBatch) { + sendLogs(currBatch); } - class AppendEntriesHandler implements AsyncMethodCallback<Long> { - - private final List<AsyncMethodCallback<Long>> singleEntryHandlers; - - private AppendEntriesHandler(List<SendLogRequest> batch) { - singleEntryHandlers = new ArrayList<>(batch.size()); - for (SendLogRequest sendLogRequest : batch) { - AppendNodeEntryHandler handler = - getAppendNodeEntryHandler( - sendLogRequest.getLog(), - sendLogRequest.getVoteCounter(), - receiver, - sendLogRequest.getLeaderShipStale(), - sendLogRequest.getNewLeaderTerm(), - peer); - singleEntryHandlers.add(handler); - } - } - - @Override - public void onComplete(Long aLong) { - for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) { - singleEntryHandler.onComplete(aLong); - } - } - - @Override - public void onError(Exception e) { - for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) { - singleEntryHandler.onError(e); - } - } - - private AppendNodeEntryHandler getAppendNodeEntryHandler( - Log log, - AtomicInteger voteCounter, - Node node, - AtomicBoolean leaderShipStale, - AtomicLong newLeaderTerm, - Peer peer) { - AppendNodeEntryHandler handler = new AppendNodeEntryHandler(); - handler.setReceiver(node); - handler.setVoteCounter(voteCounter); - handler.setLeaderShipStale(leaderShipStale); - handler.setLog(log); - handler.setMember(member); - handler.setPeer(peer); - handler.setReceiverTerm(newLeaderTerm); - return handler; - } - } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java new file mode 100644 index 0000000..efa63c2 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.expr; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import org.apache.iotdb.cluster.coordinator.Coordinator; +import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; +import org.apache.iotdb.cluster.server.Response; +import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.cluster.server.monitor.Timer; +import org.apache.iotdb.cluster.utils.ClientUtils; +import org.apache.iotdb.cluster.utils.ClusterUtils; +import org.apache.iotdb.cluster.utils.StatusUtils; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.ExprPlan; +import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocolFactory; + +public class ExprMember extends MetaGroupMember { + + public ExprMember() { + } + + public ExprMember(Node thisNode, List<Node> allNodes) { + this.thisNode = thisNode; + this.allNodes = allNodes; + } + + public ExprMember(TProtocolFactory factory, + Node thisNode, Coordinator coordinator) + throws QueryProcessException { + super(factory, thisNode, coordinator); + } + + private int windowSize = 10000; + private Log[] logWindow = new Log[windowSize]; + private long windowPrevLogIndex; + private long windowPrevLogTerm; + private long windowTerm; + + @Override + protected synchronized void startSubServers() { + + } + + @Override + public TSStatus executeNonQueryPlan(PhysicalPlan plan) { + if (false) { + if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) { + return StatusUtils.OK; + } else if (plan instanceof ExprPlan) { + ((ExprPlan) plan).setNeedForward(false); + } + + ExecutNonQueryReq req = new ExecutNonQueryReq(); + ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024); + plan.serialize(byteBuffer); + byteBuffer.flip(); + req.setPlanBytes(byteBuffer); + + for (Node node : getAllNodes()) { + if (!ClusterUtils.isNodeEquals(node, thisNode)) { + Client syncClient = getSyncClient(node); + try { + syncClient.executeNonQueryPlan(req); + } catch (TException e) { + ClientUtils.putBackSyncClient(syncClient); + return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage()); + } + ClientUtils.putBackSyncClient(syncClient); + } + } + return StatusUtils.OK; + } + return processNonPartitionedMetaPlan(plan); + } + + protected long appendEntry1(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) { + long resp; + long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime(); + long success = 0; + + synchronized (logManager) { + long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1; + if (windowPos < 0) { + success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log); + } else if (windowPos < windowSize) { + logWindow[(int) windowPos] = log; + if (windowPos == 0) { + windowPrevLogIndex = prevLogIndex; + windowPrevLogTerm = prevLogTerm; + + int flushPos = 0; + for (; flushPos < windowSize; flushPos++) { + if (logWindow[flushPos] == null) { + break; + } + } + // flush [0, flushPos) + List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos); + success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, + logs); + if (success != -1) { + System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos); + for (int i = 1; i <= flushPos; i++) { + logWindow[windowSize - i] = null; + } + } else { + System.out.println("not success"); + } + } + } else { + return Response.RESPONSE_LOG_MISMATCH; + } + } + + Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime); + if (success != -1) { + resp = Response.RESPONSE_AGREE; + } else { + // the incoming log points to an illegal position, reject it + resp = Response.RESPONSE_LOG_MISMATCH; + } + return resp; + } + +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java new file mode 100644 index 0000000..338755e --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.expr; + +import io.moquette.broker.config.IConfig; +import java.net.InetSocketAddress; +import java.util.Arrays; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.exception.ConfigInconsistentException; +import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; +import org.apache.iotdb.cluster.log.LogDispatcher; +import org.apache.iotdb.cluster.server.MetaClusterServer; +import org.apache.iotdb.cluster.server.member.MetaGroupMember; +import org.apache.iotdb.cluster.server.member.RaftMember; +import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.tsfile.read.filter.operator.In; +import org.apache.thrift.protocol.TBinaryProtocol.Factory; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportException; + +public class ExprServer extends MetaClusterServer { + + protected ExprServer() throws QueryProcessException { + super(); + } + + @Override + protected MetaGroupMember createMetaGroupMember() throws QueryProcessException { + return new ExprMember(new Factory(), thisNode, coordinator); + } + + @Override + protected String getClientThreadPrefix() { + return "FollowerClient"; + } + + @Override + protected String getServerClientName() { + return "FollowerServer"; + } + + @Override + protected TServerTransport getServerSocket() throws TTransportException { + return new TServerSocket( + new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort())); + } + + + public static void main(String[] args) + throws StartupException, TTransportException, QueryProcessException, ConfigInconsistentException, StartUpCheckFailureException { + + String[] nodeStrings = args[0].split(":"); + String ip = nodeStrings[0]; + int port = Integer.parseInt(nodeStrings[1]); + String[] allNodeStr = args[1].split(","); + + int dispatcherThreadNum = Integer.parseInt(args[2]); + + ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr)); + ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port); + ClusterDescriptor.getInstance().getConfig().setInternalIp(ip); + ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false); + ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false); + RaftMember.USE_LOG_DISPATCHER = true; + LogDispatcher.bindingThreadNum = dispatcherThreadNum; + + ExprServer server = new ExprServer(); + server.start(); + server.buildCluster(); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java new file mode 100644 index 0000000..75f1b16 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.log; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import org.apache.iotdb.cluster.log.LogDispatcher.DispatcherThread; +import org.apache.iotdb.cluster.query.manage.QueryCoordinator; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.server.member.RaftMember; +import org.apache.iotdb.cluster.server.monitor.Timer; +import org.apache.iotdb.cluster.utils.ClusterUtils; + +/** + * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all + * followers and let the selected followers to broadcast the log to other followers. + */ +public class IndirectLogDispatcher extends LogDispatcher { + + private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>(); + + public IndirectLogDispatcher(RaftMember member) { + super(member); + recalculateDirectFollowerMap(); + } + + @Override + LogDispatcher.DispatcherThread newDispatcherThread(Node node, + BlockingQueue<SendLogRequest> logBlockingQueue) { + return new DispatcherThread(node, logBlockingQueue); + } + + @Override + void createQueueAndBindingThreads() { + for (Node node : directToIndirectFollowerMap.keySet()) { + nodeLogQueues.add(createQueueAndBindingThread(node)); + } + } + + public void recalculateDirectFollowerMap() { + List<Node> allNodes = new ArrayList<>(member.getAllNodes()); + allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode())); + QueryCoordinator instance = QueryCoordinator.getINSTANCE(); + List<Node> orderedNodes = instance.reorderNodes(allNodes); + + synchronized (this) { + directToIndirectFollowerMap.clear(); + for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) { + if (i != j) { + directToIndirectFollowerMap.put(orderedNodes.get(i), + Collections.singletonList(orderedNodes.get(j))); + } else { + directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList()); + } + } + } + } + + class DispatcherThread extends LogDispatcher.DispatcherThread { + + DispatcherThread(Node receiver, + BlockingQueue<SendLogRequest> logBlockingDeque) { + super(receiver, logBlockingDeque); + } + + @Override + void sendLog(SendLogRequest logRequest) { + Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart( + logRequest.getLog().getCreateTime()); + member.sendLogToFollower( + logRequest.getLog(), + logRequest.getVoteCounter(), + receiver, + logRequest.getLeaderShipStale(), + logRequest.getNewLeaderTerm(), + logRequest.getAppendEntryRequest(), directToIndirectFollowerMap.get(receiver)); + Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart( + logRequest.getLog().getCreateTime()); + } + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java index a600c4a..711b511 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.log; +import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; @@ -63,19 +64,24 @@ import java.util.concurrent.atomic.AtomicLong; public class LogDispatcher { private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class); - private RaftMember member; - private boolean useBatchInLogCatchUp = - ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp(); - private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>(); + RaftMember member; + private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig(); + private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp(); + List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>(); private ExecutorService executorService; private static ExecutorService serializationService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build()); + public static int bindingThreadNum = 1; public LogDispatcher(RaftMember member) { this.member = member; executorService = Executors.newCachedThreadPool(); + createQueueAndBindingThreads(); + } + + void createQueueAndBindingThreads() { for (Node node : member.getAllNodes()) { if (!node.equals(member.getThisNode())) { nodeLogQueues.add(createQueueAndBindingThread(node)); @@ -129,17 +135,20 @@ public class LogDispatcher { } } - private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) { + BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) { BlockingQueue<SendLogRequest> logBlockingQueue = new ArrayBlockingQueue<>( ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()); - int bindingThreadNum = 1; for (int i = 0; i < bindingThreadNum; i++) { - executorService.submit(new DispatcherThread(node, logBlockingQueue)); + executorService.submit(newDispatcherThread(node, logBlockingQueue)); } return logBlockingQueue; } + DispatcherThread newDispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingQueue) { + return new DispatcherThread(node, logBlockingQueue); + } + public static class SendLogRequest { private Log log; @@ -219,7 +228,7 @@ public class LogDispatcher { class DispatcherThread implements Runnable { - private Node receiver; + Node receiver; private BlockingQueue<SendLogRequest> logBlockingDeque; private List<SendLogRequest> currBatch = new ArrayList<>(); private Peer peer; @@ -238,9 +247,11 @@ public class LogDispatcher { Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver); try { while (!Thread.interrupted()) { - SendLogRequest poll = logBlockingDeque.take(); - currBatch.add(poll); - logBlockingDeque.drainTo(currBatch); + synchronized (logBlockingDeque) { + SendLogRequest poll = logBlockingDeque.take(); + currBatch.add(poll); + logBlockingDeque.drainTo(currBatch); + } if (logger.isDebugEnabled()) { logger.debug("Sending {} logs to {}", currBatch.size(), receiver); } @@ -388,7 +399,7 @@ public class LogDispatcher { } } - private void sendLog(SendLogRequest logRequest) { + void sendLog(SendLogRequest logRequest) { Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart( logRequest.getLog().getCreateTime()); member.sendLogToFollower( diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index 446eefc..eae40e1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -420,6 +420,25 @@ public abstract class RaftLogManager { return -1; } + public long append(long lastIndex, long lastTerm, long leaderCommit, Log entry) { + long newLastIndex = lastIndex + 1; + if (entry.getCurrLogIndex() <= commitIndex) { + logger.debug( + "{}: entry {} conflict with committed entry [commitIndex({})]", + name, + entry.getCurrLogIndex(), + commitIndex); + } else { + append(entry); + } + try { + commitTo(Math.min(leaderCommit, newLastIndex)); + } catch (LogExecutionException e) { + // exceptions are ignored on follower side + } + return newLastIndex; + } + /** * Used by leader node or MaybeAppend to directly append to unCommittedEntryManager. Note that the * caller should ensure entries[0].index > committed. @@ -466,6 +485,20 @@ public abstract class RaftLogManager { return getLastLogIndex(); } + public long appendDirectly(Log entry) { + long after = entry.getCurrLogIndex(); + if (after <= commitIndex) { + logger.error("{}: after({}) is out of range [commitIndex({})]", name, after, commitIndex); + return -1; + } + getUnCommittedEntryManager().truncateAndAppend(entry); + Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex()); + synchronized (logUpdateCondition) { + logUpdateCondition.notifyAll(); + } + return getLastLogIndex(); + } + /** * Used by leader node to try to commit entries. * diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java index a3026c9..c85e709 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java @@ -185,10 +185,10 @@ public class UnCommittedEntryManager { * @param appendingEntry request entry */ void truncateAndAppend(Log appendingEntry) { - if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) { - // skip existing entry - return; - } +// if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) { +// // skip existing entry +// return; +// } long after = appendingEntry.getCurrLogIndex(); long len = after - offset; @@ -203,8 +203,8 @@ public class UnCommittedEntryManager { } else { // clear conflict entries // then append - logger.info( - "truncate the entries after index {}, append a new entry {}", after, appendingEntry); +// logger.info( +// "truncate the entries after index {}, append a new entry {}", after, appendingEntry); int truncateIndex = (int) (after - offset); if (truncateIndex < entries.size()) { entries.subList(truncateIndex, entries.size()).clear(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java index c81e6d0..a945a92 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java @@ -431,7 +431,7 @@ public class SyncLogDequeSerializer implements StableEntryManager { @Override public void setHardStateAndFlush(HardState state) { this.state = state; - serializeMeta(meta); + // serializeMeta(meta); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index e4c81f8..d88d1a3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -521,7 +521,7 @@ public class DataClusterServer extends RaftServer } @Override - TProcessor getProcessor() { + protected TProcessor getProcessor() { if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { return new AsyncProcessor<>(this); } else { @@ -530,7 +530,7 @@ public class DataClusterServer extends RaftServer } @Override - TServerTransport getServerSocket() throws TTransportException { + protected TServerTransport getServerSocket() throws TTransportException { logger.info( "[{}] Cluster node will listen {}:{}", getServerClientName(), @@ -547,12 +547,12 @@ public class DataClusterServer extends RaftServer } @Override - String getClientThreadPrefix() { + protected String getClientThreadPrefix() { return "DataClientThread-"; } @Override - String getServerClientName() { + protected String getServerClientName() { return "DataServerThread-"; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index 12e286f..ddc64cc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -74,27 +74,31 @@ public class MetaClusterServer extends RaftServer private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class); // each node only contains one MetaGroupMember - private MetaGroupMember member; - private Coordinator coordinator; + protected MetaGroupMember member; + protected Coordinator coordinator; // the single-node IoTDB instance private IoTDB ioTDB; // to register the ClusterMonitor that helps monitoring the cluster private RegisterManager registerManager = new RegisterManager(); - private MetaAsyncService asyncService; - private MetaSyncService syncService; - private MetaHeartbeatServer metaHeartbeatServer; + protected MetaAsyncService asyncService; + protected MetaSyncService syncService; + protected MetaHeartbeatServer metaHeartbeatServer; public MetaClusterServer() throws QueryProcessException { super(); metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this); coordinator = new Coordinator(); - member = new MetaGroupMember(protocolFactory, thisNode, coordinator); + member = createMetaGroupMember(); coordinator.setMetaGroupMember(member); asyncService = new MetaAsyncService(member); syncService = new MetaSyncService(member); MetaPuller.getInstance().init(member); } + protected MetaGroupMember createMetaGroupMember() throws QueryProcessException { + return new MetaGroupMember(protocolFactory, thisNode, coordinator); + } + /** * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the * ClusterMonitor are also started. @@ -150,7 +154,7 @@ public class MetaClusterServer extends RaftServer * @throws TTransportException if create the socket fails */ @Override - TServerTransport getServerSocket() throws TTransportException { + protected TServerTransport getServerSocket() throws TTransportException { logger.info( "[{}] Cluster node will listen {}:{}", getServerClientName(), @@ -167,17 +171,17 @@ public class MetaClusterServer extends RaftServer } @Override - String getClientThreadPrefix() { + protected String getClientThreadPrefix() { return "MetaClientThread-"; } @Override - String getServerClientName() { + protected String getServerClientName() { return "MetaServerThread-"; } @Override - TProcessor getProcessor() { + protected TProcessor getProcessor() { if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { return new AsyncProcessor<>(this); } else { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index 09956d2..405c6e2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java @@ -71,9 +71,9 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. private TServerTransport socket; // RPC processing server private TServer poolServer; - Node thisNode; + protected Node thisNode; - TProtocolFactory protocolFactory = + protected TProtocolFactory protocolFactory = config.isRpcThriftCompressionEnabled() ? new TCompactProtocol.Factory() : new TBinaryProtocol.Factory(); @@ -92,7 +92,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress()); } - RaftServer(Node thisNode) { + protected RaftServer(Node thisNode) { this.thisNode = thisNode; } @@ -167,14 +167,14 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of * RaftService (DataService or MetaService). */ - abstract TProcessor getProcessor(); + protected abstract TProcessor getProcessor(); /** * @return A socket that will be used to establish a thrift server to listen to RPC requests. * DataServer and MetaServer use different port, so this is to be determined. * @throws TTransportException */ - abstract TServerTransport getServerSocket() throws TTransportException; + protected abstract TServerTransport getServerSocket() throws TTransportException; /** * Each thrift RPC request will be processed in a separate thread and this will return the name @@ -183,7 +183,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. * * @return name prefix of RPC processing threads. */ - abstract String getClientThreadPrefix(); + protected abstract String getClientThreadPrefix(); /** * The thrift server will be run in a separate thread, and this will be its name. It help you @@ -191,7 +191,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. * * @return The name of the thread running the thrift server. */ - abstract String getServerClientName(); + protected abstract String getServerClientName(); private TServer createAsyncServer() throws TTransportException { socket = getServerSocket(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java index de0b83f..6d95e06 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java @@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse> if (lastLogIdx == peer.getLastHeartBeatIndex()) { // the follower's lastLogIndex is unchanged, increase inconsistent counter int inconsistentNum = peer.incInconsistentHeartbeatNum(); - if (inconsistentNum >= 5) { + if (inconsistentNum >= 10000) { logger.info( "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}", memberName, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java new file mode 100644 index 0000000..71903d4 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.server.handlers.forwarder; + +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.thrift.async.AsyncMethodCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndirectAppendHandler implements AsyncMethodCallback<Long> { + + private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class); + private Node receiver; + private AppendEntryRequest request; + + public IndirectAppendHandler(Node receiver, + AppendEntryRequest request) { + this.receiver = receiver; + this.request = request; + } + + @Override + public void onComplete(Long response) { + // ignore response from indirect appender + } + + @Override + public void onError(Exception exception) { + logger.warn("Cannot send request {} to {}", request, receiver, exception); + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java index 0459fef..50a46bb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java @@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.utils.ClientUtils; +import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,7 +153,7 @@ public class HeartbeatThread implements Runnable { synchronized (nodes) { // avoid concurrent modification for (Node node : nodes) { - if (node.equals(localMember.getThisNode())) { + if (ClusterUtils.isNodeEquals(node, localMember.getThisNode())) { continue; } if (Thread.currentThread().isInterrupted()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java index ed99c3d..e744fdb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java @@ -74,8 +74,8 @@ public class MetaHeartbeatServer extends HeartbeatServer { } else { return new TServerSocket( new InetSocketAddress( - config.getInternalIp(), - config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET)); + metaClusterServer.getMember().getThisNode().getInternalIp(), + metaClusterServer.getMember().getThisNode().getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET)); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java index 002c4fe..2c5cf35 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java @@ -40,7 +40,15 @@ public class MetaHeartbeatThread extends HeartbeatThread { request.setRequireIdentifier(!node.isSetNodeIdentifier()); synchronized (localMetaMember.getIdConflictNodes()) { request.unsetRegenerateIdentifier(); - if (localMetaMember.getIdConflictNodes().contains(node)) { + boolean hasIdConflict = false; + for (Node idConflictNode : localMetaMember.getIdConflictNodes()) { + if (idConflictNode.getInternalIp().equals(node.internalIp) + && idConflictNode.getMetaPort() == node.metaPort) { + hasIdConflict = true; + break; + } + } + if (hasIdConflict) { request.setRegenerateIdentifier(true); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index eb10d44..056a485 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server.member; +import java.util.Comparator; import org.apache.iotdb.cluster.client.DataClientProvider; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.async.AsyncMetaClient; @@ -1021,21 +1022,26 @@ public class MetaGroupMember extends RaftMember { ExecutorService pool = new ScheduledThreadPoolExecutor(getAllNodes().size() - 1); for (Node seedNode : getAllNodes()) { Node thisNode = getThisNode(); - if (seedNode.equals(thisNode)) { + if (ClusterUtils.isNodeEquals(thisNode, seedNode)) { continue; } pool.submit( () -> { - CheckStatusResponse response = checkStatus(seedNode); - if (response != null) { - // check the response - ClusterUtils.examineCheckStatusResponse( - response, consistentNum, inconsistentNum, seedNode); - } else { - logger.warn( - "Start up exception. Cannot connect to node {}. Try again in next turn.", - seedNode); + try { + CheckStatusResponse response = checkStatus(seedNode); + if (response != null) { + // check the response + ClusterUtils.examineCheckStatusResponse( + response, consistentNum, inconsistentNum, seedNode); + } else { + logger.warn( + "Start up exception. Cannot connect to node {}. Try again in next turn.", + seedNode); + } + } catch (Exception e) { + logger.error("Start up exception:", e); } + }); } pool.shutdown(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index f2187f9..daa3f4e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server.member; +import java.util.Collections; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor; import org.apache.iotdb.cluster.client.sync.SyncClientPool; @@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.catchup.CatchUpTask; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.manage.RaftLogManager; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; @@ -53,6 +55,7 @@ import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; +import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler; import org.apache.iotdb.cluster.server.monitor.Peer; import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; @@ -77,7 +80,9 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,12 +112,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** - * RaftMember process the common raft logic like leader election, log appending, catch-up and so on. + * RaftMember process the common raft logic like leader election, log appending, catch-up and so + * on. */ @SuppressWarnings("java:S3077") // reference volatile is enough public abstract class RaftMember { + private static final Logger logger = LoggerFactory.getLogger(RaftMember.class); - public static final boolean USE_LOG_DISPATCHER = false; + public static boolean USE_LOG_DISPATCHER = false; private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out"; private static final String MSG_FORWARD_ERROR = @@ -132,19 +139,29 @@ public abstract class RaftMember { * on this may be woken. */ private final Object waitLeaderCondition = new Object(); - /** the lock is to make sure that only one thread can apply snapshot at the same time */ + /** + * the lock is to make sure that only one thread can apply snapshot at the same time + */ private final Object snapshotApplyLock = new Object(); protected Node thisNode = ClusterConstant.EMPTY_NODE; - /** the nodes that belong to the same raft group as thisNode. */ + /** + * the nodes that belong to the same raft group as thisNode. + */ protected List<Node> allNodes; ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); - /** the name of the member, to distinguish several members in the logs. */ + /** + * the name of the member, to distinguish several members in the logs. + */ String name; - /** to choose nodes to send request of joining cluster randomly. */ + /** + * to choose nodes to send request of joining cluster randomly. + */ Random random = new Random(); - /** when the node is a leader, this map is used to track log progress of each follower. */ + /** + * when the node is a leader, this map is used to track log progress of each follower. + */ Map<Node, Peer> peerMap; /** * the current term of the node, this object also works as lock of some transactions of the member @@ -165,8 +182,10 @@ public abstract class RaftMember { * offline. */ volatile long lastHeartbeatReceivedTime; - /** the raft logs are all stored and maintained in the log manager */ - RaftLogManager logManager; + /** + * the raft logs are all stored and maintained in the log manager + */ + protected RaftLogManager logManager; /** * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower * when this node is a leader, or start elections when this node is an elector. @@ -183,7 +202,9 @@ public abstract class RaftMember { * member by comparing it with the current last log index. */ long lastReportedLogIndex; - /** the thread pool that runs catch-up tasks */ + /** + * the thread pool that runs catch-up tasks + */ private ExecutorService catchUpService; /** * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There @@ -219,20 +240,30 @@ public abstract class RaftMember { * one slow node. */ private ExecutorService serialToParallelPool; - /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */ + /** + * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread + */ private ExecutorService commitLogPool; /** * logDispatcher buff the logs orderly according to their log indexes and send them sequentially, - * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs. + * which avoids the followers receiving out-of-order logs, forcing them to wait for previous + * logs. */ private LogDispatcher logDispatcher; /** - * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB + * localExecutor is used to directly execute plans like load configuration in the underlying + * IoTDB */ protected PlanExecutor localExecutor; - protected RaftMember() {} + /** + * (logIndex, logTerm) -> append handler + */ + protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>(); + + protected RaftMember() { + } protected RaftMember( String name, @@ -527,10 +558,68 @@ public abstract class RaftMember { long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log); logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result); + if (!request.isFromLeader) { + appendAckLeader(request.leader, log, result); + } + return result; } - /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */ + private void appendAckLeader(Node leader, Log log, long response) { + AppendEntryAcknowledgement appendEntryAcknowledgement = new AppendEntryAcknowledgement(); + appendEntryAcknowledgement.index = log.getCurrLogIndex(); + appendEntryAcknowledgement.term = log.getCurrLogTerm(); + appendEntryAcknowledgement.response = response; + + Client syncClient = null; + try { + if (config.isUseAsyncServer()) { + GenericHandler<Void> handler = new GenericHandler<>(leader, null); + getAsyncClient(leader).acknowledgeAppendEntry(appendEntryAcknowledgement, handler); + } else { + syncClient = getSyncClient(leader); + syncClient.acknowledgeAppendEntry(appendEntryAcknowledgement); + } + } catch (TException e) { + logger.warn("Cannot send ack of {} to leader {}", log, leader, e); + } finally { + if (syncClient != null) { + ClientUtils.putBackSyncClient(syncClient); + } + } + } + + public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException { + long result = appendEntry(request); + appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers)); + return result; + } + + private void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) { + request.setIsFromLeader(false); + for (Node subFollower : subFollowers) { + Client syncClient = null; + try { + if (config.isUseAsyncServer()) { + getAsyncClient(subFollower).appendEntry(request, new IndirectAppendHandler(subFollower, + request)); + } else { + syncClient = getSyncClient(subFollower); + syncClient.appendEntry(request); + } + } catch (TException e) { + logger.error("Cannot send {} to {}", request, subFollower, e); + } finally { + if (syncClient != null) { + ClientUtils.putBackSyncClient(syncClient); + } + } + } + } + + /** + * Similar to appendEntry, while the incoming load is batch of logs instead of a single log. + */ public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException { logger.debug("{} received an AppendEntriesRequest", name); @@ -603,13 +692,17 @@ public abstract class RaftMember { AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, AppendEntryRequest request, - Peer peer) { + Peer peer, List<Node> indirectReceivers) { AsyncClient client = getSendLogAsyncClient(node); if (client != null) { AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer); try { - client.appendEntry(request, handler); + if (indirectReceivers == null || indirectReceivers.isEmpty()) { + client.appendEntry(request, handler); + } else { + client.appendEntryIndirect(request, indirectReceivers, handler); + } logger.debug("{} sending a log to {}: {}", name, node, log); } catch (Exception e) { logger.warn("{} cannot append log to node {}", name, node, e); @@ -680,16 +773,22 @@ public abstract class RaftMember { return lastCatchUpResponseTime; } - /** Sub-classes will add their own process of HeartBeatResponse in this method. */ - public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {} + /** + * Sub-classes will add their own process of HeartBeatResponse in this method. + */ + public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) { + } - /** The actions performed when the node wins in an election (becoming a leader). */ - public void onElectionWins() {} + /** + * The actions performed when the node wins in an election (becoming a leader). + */ + public void onElectionWins() { + } /** * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the - * follower. If some of the required logs are removed, also send the snapshot. <br> - * notice that if a part of data is in the snapshot, then it is not in the logs. + * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if + * a part of data is in the snapshot, then it is not in the logs. */ public void catchUp(Node follower, long lastLogIdx) { // for one follower, there is at most one ongoing catch-up, so the same data will not be sent @@ -744,7 +843,7 @@ public abstract class RaftMember { * @param plan a non-query plan. * @return A TSStatus indicating the execution result. */ - abstract TSStatus executeNonQueryPlan(PhysicalPlan plan); + protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan); /** * according to the consistency configuration, decide whether to execute syncLeader or not and @@ -775,7 +874,9 @@ public abstract class RaftMember { } } - /** call back after syncLeader */ + /** + * call back after syncLeader + */ public interface CheckConsistency { /** @@ -784,7 +885,7 @@ public abstract class RaftMember { * @param leaderCommitId leader commit id * @param localAppliedId local applied id * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in - * implements. + * implements. */ void postCheckConsistency(long leaderCommitId, long localAppliedId) throws CheckConsistencyException; @@ -793,8 +894,7 @@ public abstract class RaftMember { public static class MidCheckConsistency implements CheckConsistency { /** - * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw - * CHECK_MID_CONSISTENCY_EXCEPTION + * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION * * @param leaderCommitId leader commit id * @param localAppliedId local applied id @@ -806,7 +906,7 @@ public abstract class RaftMember { if (leaderCommitId == Long.MAX_VALUE || leaderCommitId == Long.MIN_VALUE || leaderCommitId - localAppliedId - > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) { + > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) { throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION; } } @@ -839,7 +939,7 @@ public abstract class RaftMember { * @param checkConsistency check after syncleader * @return true if the node has caught up, false otherwise * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold - * value after timeout + * value after timeout */ public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException { if (character == NodeCharacter.LEADER) { @@ -858,7 +958,9 @@ public abstract class RaftMember { return waitUntilCatchUp(checkConsistency); } - /** Wait until the leader of this node becomes known or time out. */ + /** + * Wait until the leader of this node becomes known or time out. + */ public void waitLeader() { long startTime = System.currentTimeMillis(); while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) { @@ -885,7 +987,7 @@ public abstract class RaftMember { * * @return true if this node has caught up before timeout, false otherwise * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold - * value after timeout + * value after timeout */ protected boolean waitUntilCatchUp(CheckConsistency checkConsistency) throws CheckConsistencyException { @@ -957,7 +1059,7 @@ public abstract class RaftMember { * call this method. Will commit the log locally and send it to followers * * @return OK if over half of the followers accept the log or null if the leadership is lost - * during the appending + * during the appending */ TSStatus processPlanLocally(PhysicalPlan plan) { if (USE_LOG_DISPATCHER) { @@ -1156,7 +1258,9 @@ public abstract class RaftMember { return peerMap; } - /** @return true if there is a log whose index is "index" and term is "term", false otherwise */ + /** + * @return true if there is a log whose index is "index" and term is "term", false otherwise + */ public boolean matchLog(long index, long term) { boolean matched = logManager.matchTerm(term, index); logger.debug("Log {}-{} matched: {}", index, term, matched); @@ -1171,15 +1275,18 @@ public abstract class RaftMember { return syncLock; } - /** Sub-classes will add their own process of HeartBeatRequest in this method. */ - void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {} + /** + * Sub-classes will add their own process of HeartBeatRequest in this method. + */ + void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) { + } /** * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the * request is valid. * * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a - * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs. + * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs. */ long checkElectorLogProgress(ElectionRequest electionRequest) { @@ -1222,7 +1329,7 @@ public abstract class RaftMember { * lastLogIndex is smaller than the voter's Otherwise accept the election. * * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a - * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs. + * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs. */ long checkLogProgress(long lastLogIndex, long lastLogTerm) { long response; @@ -1239,10 +1346,10 @@ public abstract class RaftMember { /** * Forward a non-query plan to a node using the default client. * - * @param plan a non-query plan - * @param node cannot be the local node + * @param plan a non-query plan + * @param node cannot be the local node * @param header must be set for data group communication, set to null for meta group - * communication + * communication * @return a TSStatus indicating if the forwarding is successful. */ public TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) { @@ -1273,7 +1380,7 @@ public abstract class RaftMember { /** * Forward a non-query plan to "receiver" using "client". * - * @param plan a non-query plan + * @param plan a non-query plan * @param header to determine which DataGroupMember of "receiver" will process the request. * @return a TSStatus indicating if the forwarding is successful. */ @@ -1354,7 +1461,7 @@ public abstract class RaftMember { * Get an asynchronous thrift client of the given node. * * @return an asynchronous thrift client or null if the caller tries to connect the local node or - * the node cannot be reached. + * the node cannot be reached. */ public AsyncClient getAsyncClient(Node node) { return getAsyncClient(node, asyncClientPool, true); @@ -1541,7 +1648,7 @@ public abstract class RaftMember { * heartbeat timer. * * @param fromLeader true if the request is from a leader, false if the request is from an - * elector. + * elector. */ public void stepDown(long newTerm, boolean fromLeader) { synchronized (term) { @@ -1572,7 +1679,9 @@ public abstract class RaftMember { this.thisNode = thisNode; } - /** @return the header of the data raft group or null if this is in a meta group. */ + /** + * @return the header of the data raft group or null if this is in a meta group. + */ public Node getHeader() { return null; } @@ -1653,7 +1762,7 @@ public abstract class RaftMember { * success. * * @param requiredQuorum the number of votes needed to make the log valid, when requiredQuorum <= - * 0, half of the cluster size will be used. + * 0, half of the cluster size will be used. * @return an AppendLogResult */ private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) { @@ -1722,7 +1831,6 @@ public abstract class RaftMember { return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm); } - /** Send "log" to "node". */ public void sendLogToFollower( Log log, AtomicInteger voteCounter, @@ -1730,6 +1838,21 @@ public abstract class RaftMember { AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, AppendEntryRequest request) { + sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, + Collections.emptyList()); + } + + /** + * Send "log" to "node". + */ + public void sendLogToFollower( + Log log, + AtomicInteger voteCounter, + Node node, + AtomicBoolean leaderShipStale, + AtomicLong newLeaderTerm, + AppendEntryRequest request, + List<Node> indirectReceivers) { if (node.equals(thisNode)) { return; } @@ -1750,9 +1873,9 @@ public abstract class RaftMember { } if (config.isUseAsyncServer()) { - sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer); + sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers); } else { - sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer); + sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers); } } @@ -1791,14 +1914,20 @@ public abstract class RaftMember { AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, AppendEntryRequest request, - Peer peer) { + Peer peer, List<Node> indirectReceivers) { Client client = getSyncClient(node); if (client != null) { AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer); try { logger.debug("{} sending a log to {}: {}", name, node, log); - long result = client.appendEntry(request); + long result; + if (indirectReceivers == null || indirectReceivers.isEmpty()) { + result = client.appendEntry(request); + } else { + result = client.appendEntryIndirect(request, indirectReceivers); + } + handler.onComplete(result); } catch (TException e) { client.getInputProtocol().getTransport().close(); @@ -1843,9 +1972,9 @@ public abstract class RaftMember { * and append "log" to it. Otherwise report a log mismatch. * * @return Response.RESPONSE_AGREE when the log is successfully appended or Response - * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found. + * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found. */ - private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) { + protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) { long resp = checkPrevLogIndex(prevLogIndex); if (resp != Response.RESPONSE_AGREE) { return resp; @@ -1867,7 +1996,9 @@ public abstract class RaftMember { return resp; } - /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */ + /** + * Wait until all logs before "prevLogIndex" arrive or a timeout is reached. + */ private boolean waitForPrevLog(long prevLogIndex) { long waitStart = System.currentTimeMillis(); long alreadyWait = 0; @@ -1893,7 +2024,7 @@ public abstract class RaftMember { return alreadyWait <= RaftServer.getWriteOperationTimeoutMS(); } - private long checkPrevLogIndex(long prevLogIndex) { + protected long checkPrevLogIndex(long prevLogIndex) { long lastLogIndex = logManager.getLastLogIndex(); long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime(); if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) { @@ -1913,7 +2044,7 @@ public abstract class RaftMember { * * @param logs append logs * @return Response.RESPONSE_AGREE when the log is successfully appended or Response - * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found. + * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found. */ private long appendEntries( long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) { @@ -1987,4 +2118,25 @@ public abstract class RaftMember { TIME_OUT, LEADERSHIP_STALE } + + /** + * Process the result from an indirect receiver of an entry. + * @param ack acknowledgement from an indirect receiver. + */ + public void acknowledgeAppendLog(AppendEntryAcknowledgement ack) { + AppendNodeEntryHandler appendNodeEntryHandler = sentLogHandlers + .get(new Pair<>(ack.index, ack.term)); + if (appendNodeEntryHandler != null) { + appendNodeEntryHandler.onComplete(ack.response); + } + } + + public void registerAppendLogHandler(Pair<Long, Long> indexTerm, + AppendNodeEntryHandler appendNodeEntryHandler) { + sentLogHandlers.put(indexTerm, appendNodeEntryHandler); + } + + public void removeAppendLogHandler(Pair<Long, Long> indexTerm) { + sentLogHandlers.remove(indexTerm); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java index 8673078..eecf26e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java @@ -19,9 +19,11 @@ package org.apache.iotdb.cluster.server.service; +import java.util.List; import org.apache.iotdb.cluster.exception.LeaderUnknownException; import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; @@ -173,4 +175,21 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { resultHandler.onError(e); } } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack, + AsyncMethodCallback<Void> resultHandler) { + member.acknowledgeAppendLog(ack); + resultHandler.onComplete(null); + } + + @Override + public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers, + AsyncMethodCallback<Long> resultHandler) { + try { + resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers)); + } catch (UnknownLogTypeException e) { + resultHandler.onError(e); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java index ce200ab..181ad8a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java @@ -19,9 +19,16 @@ package org.apache.iotdb.cluster.server.service; +import java.io.File; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.List; import org.apache.iotdb.cluster.exception.LeaderUnknownException; import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; @@ -31,30 +38,21 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService; import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; -import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.utils.ClientUtils; import org.apache.iotdb.cluster.utils.IOUtils; -import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.service.rpc.thrift.TSStatus; - import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.nio.file.Files; - public abstract class BaseSyncService implements RaftService.Iface { private static final Logger logger = LoggerFactory.getLogger(BaseSyncService.class); RaftMember member; String name; - BaseSyncService(RaftMember member) { + protected BaseSyncService(RaftMember member) { this.member = member; this.name = member.getName(); } @@ -153,29 +151,25 @@ public abstract class BaseSyncService implements RaftService.Iface { @Override public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException { - if (member.getCharacter() != NodeCharacter.LEADER) { - // forward the plan to the leader - Client client = member.getSyncClient(member.getLeader()); - if (client != null) { - TSStatus status; - try { - status = client.executeNonQueryPlan(request); - } catch (TException e) { - client.getInputProtocol().getTransport().close(); - throw e; - } finally { - ClientUtils.putBackSyncClient(client); - } - return status; - } else { - return StatusUtils.NO_LEADER; - } - } - try { return member.executeNonQueryPlan(request); } catch (Exception e) { throw new TException(e); } } + + @Override + public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) { + member.acknowledgeAppendLog(ack); + } + + @Override + public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers) + throws TException { + try { + return member.appendEntryIndirect(request, subReceivers); + } catch (UnknownLogTypeException e) { + throw new TException(e); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java index 255cb4b..0de64a6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.utils; +import java.util.Objects; import org.apache.iotdb.cluster.client.async.AsyncDataClient; import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient; import org.apache.iotdb.cluster.client.async.AsyncMetaClient; @@ -27,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient; import org.apache.iotdb.cluster.client.sync.SyncMetaClient; import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient; +import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java index be87eb1..905019f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java @@ -58,7 +58,7 @@ public class ClusterNode extends Node { && this.dataPort == that.dataPort && this.metaPort == that.metaPort && this.clientPort == that.clientPort - && this.clientIp.equals(that.clientIp); + && Objects.equals(this.clientIp, that.clientIp); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java index 4c64587..ef656fe 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java @@ -175,6 +175,18 @@ public class ClusterUtils { } } + public static boolean isNodeEquals(Node node1, Node node2) { + if (node1 == node2) { + return true; + } + if (node1 == null || node2 == null) { + return false; + } + return Objects.equals(node1.internalIp, node2.internalIp) && Objects.equals(node1.metaPort, + node2.metaPort); + } + + private static boolean seedNodesContains(List<Node> seedNodeList, List<Node> subSeedNodeList) { // Because identifier is not compared here, List.contains() is not suitable if (subSeedNodeList == null) { diff --git a/pom.xml b/pom.xml index aed72ae..879a584 100644 --- a/pom.xml +++ b/pom.xml @@ -621,32 +621,32 @@ </excludes> </configuration> </plugin> - <plugin> - <groupId>com.diffplug.spotless</groupId> - <artifactId>spotless-maven-plugin</artifactId> - <version>${spotless.version}</version> - <configuration> - <java> - <googleJavaFormat> - <version>1.7</version> - <style>GOOGLE</style> - </googleJavaFormat> - <importOrder> - <order>org.apache.iotdb,,javax,java,\#</order> - </importOrder> - <removeUnusedImports/> - </java> - </configuration> - <executions> - <execution> - <id>spotless-check</id> - <phase>validate</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> + <!-- <plugin>--> + <!-- <groupId>com.diffplug.spotless</groupId>--> + <!-- <artifactId>spotless-maven-plugin</artifactId>--> + <!-- <version>${spotless.version}</version>--> + <!-- <configuration>--> + <!-- <java>--> + <!-- <googleJavaFormat>--> + <!-- <version>1.7</version>--> + <!-- <style>GOOGLE</style>--> + <!-- </googleJavaFormat>--> + <!-- <importOrder>--> + <!-- <order>org.apache.iotdb,,javax,java,\#</order>--> + <!-- </importOrder>--> + <!-- <removeUnusedImports/>--> + <!-- </java>--> + <!-- </configuration>--> + <!-- <executions>--> + <!-- <execution>--> + <!-- <id>spotless-check</id>--> + <!-- <phase>validate</phase>--> + <!-- <goals>--> + <!-- <goal>check</goal>--> + <!-- </goals>--> + <!-- </execution>--> + <!-- </executions>--> + <!-- </plugin>--> </plugins> </pluginManagement> <plugins> diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index d4d08c1..0ef0033 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -349,6 +349,8 @@ public class PlanExecutor implements IPlanExecutor { return createDeviceTemplate((CreateTemplatePlan) plan); case SET_DEVICE_TEMPLATE: return setDeviceTemplate((SetDeviceTemplatePlan) plan); + case EMPTY: + return true; default: throw new UnsupportedOperationException( String.format("operation %s is not supported", plan.getOperatorType())); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index e1fb10f..fddda60 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -157,5 +157,6 @@ public abstract class Operator { CREATE_TEMPLATE, SET_DEVICE_TEMPLATE, SET_USING_DEVICE_TEMPLATE, + EMPTY, } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 4c5b1a1..457bd22 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan; import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan; +import org.apache.iotdb.db.qp.physical.sys.ExprPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; import org.apache.iotdb.db.qp.physical.sys.MNodePlan; @@ -368,6 +369,9 @@ public abstract class PhysicalPlan { case AUTO_CREATE_DEVICE_MNODE: plan = new AutoCreateDeviceMNodePlan(); break; + case EXPR: + plan = new ExprPlan(); + break; default: throw new IOException("unrecognized log type " + type); } @@ -422,7 +426,8 @@ public abstract class PhysicalPlan { CREATE_TRIGGER, DROP_TRIGGER, START_TRIGGER, - STOP_TRIGGER + STOP_TRIGGER, + EXPR } public long getIndex() { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java new file mode 100644 index 0000000..2a1e16b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.physical.sys; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; + +public class ExprPlan extends PhysicalPlan { + + private byte[] workload; + private boolean needForward; + + public ExprPlan() { + super(false, OperatorType.EMPTY); + } + + @Override + public List<PartialPath> getPaths() { + return null; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.write((byte) PhysicalPlanType.EXPR.ordinal()); + stream.writeInt(workload == null ? 0 : workload.length); + if (workload != null) { + stream.write(workload); + } + stream.write(needForward ? 1 : 0); + } + + @Override + public void serialize(ByteBuffer buffer) { + buffer.put((byte) PhysicalPlanType.EXPR.ordinal()); + buffer.putInt(workload == null ? 0 : workload.length); + if (workload != null) { + buffer.put(workload); + } + buffer.put(needForward ? (byte) 1 : 0); + } + + @Override + public void deserialize(ByteBuffer buffer) throws IllegalPathException { + int size = buffer.getInt(); + workload = new byte[size]; + buffer.get(workload); + needForward = buffer.get() == 1; + } + + public void setWorkload(byte[] workload) { + this.workload = workload; + } + + public boolean isNeedForward() { + return needForward; + } + + public void setNeedForward(boolean needForward) { + this.needForward = needForward; + } +} diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift index f23130e..449e843 100644 --- a/thrift-cluster/src/main/thrift/cluster.thrift +++ b/thrift-cluster/src/main/thrift/cluster.thrift @@ -90,6 +90,16 @@ struct AppendEntryRequest { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 7: optional Node header + // true if the request is sent from the leader, and the reiceiver just responds to the sender; + // otherwise, the reiceiver should also notify the leader + 8: optional bool isFromLeader +} + + +struct AppendEntryAcknowledgement { + 1: required long term + 2: required long index + 3: required long response } // leader -> follower @@ -306,6 +316,11 @@ service RaftService { **/ long appendEntry(1:AppendEntryRequest request) + /** + * Like appendEntry, but the receiver should forward the request to nodes in subReceivers. + **/ + long appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers) + void sendSnapshot(1:SendSnapshotRequest request) /** @@ -337,6 +352,12 @@ service RaftService { * interface to notify the leader to remove the associated hardlink. **/ void removeHardLink(1: string hardLinkPath) + + /** + * when a follower reiceives an AppendEntryRequest from a non-leader node, it sends this ack to + * the leader so the leader can know whether it has successfully received the entry + **/ + void acknowledgeAppendEntry(1: AppendEntryAcknowledgement ack) }
