This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_receiver_wait
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_receiver_wait by this
push:
new e1a3ef5 add lastHeartBeatIndex in Peer and PlanSerializer
e1a3ef5 is described below
commit e1a3ef5571cf484fa3471cb5ce99b0f3db72967c
Author: jt2594838 <[email protected]>
AuthorDate: Mon Aug 31 16:19:13 2020 +0800
add lastHeartBeatIndex in Peer and PlanSerializer
---
.../cluster/client/sync/SyncClientAdaptor.java | 8 ++---
.../java/org/apache/iotdb/cluster/server/Peer.java | 10 ++++++
.../server/handlers/caller/HeartbeatHandler.java | 18 ++++++++--
.../cluster/server/member/MetaGroupMember.java | 30 +++++++++++++----
.../iotdb/cluster/server/member/RaftMember.java | 11 +++---
.../apache/iotdb/cluster/utils/PlanSerializer.java | 39 ++++++++++++++++++++++
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 5 ++-
7 files changed, 100 insertions(+), 21 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 0b02b23..6280e1f 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -51,10 +51,11 @@ import
org.apache.iotdb.cluster.server.handlers.caller.GetChildNodeNextLevelPath
import org.apache.iotdb.cluster.server.handlers.caller.GetNodesListHandler;
import
org.apache.iotdb.cluster.server.handlers.caller.GetTimeseriesSchemaHandler;
import org.apache.iotdb.cluster.server.handlers.caller.JoinClusterHandler;
-import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
import
org.apache.iotdb.cluster.server.handlers.caller.PullMeasurementSchemaHandler;
+import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
import
org.apache.iotdb.cluster.server.handlers.caller.PullTimeseriesSchemaHandler;
import org.apache.iotdb.cluster.server.handlers.forwarder.ForwardPlanHandler;
+import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -329,13 +330,10 @@ public class SyncClientAdaptor {
public static TSStatus executeNonQuery(AsyncClient client, PhysicalPlan
plan, Node header,
Node receiver) throws IOException, TException, InterruptedException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- plan.serialize(dataOutputStream);
AtomicReference<TSStatus> status = new AtomicReference<>();
ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.setPlanBytes(byteArrayOutputStream.toByteArray());
+ req.planBytes = ByteBuffer.wrap(PlanSerializer.instance.serialize(plan));
if (header != null) {
req.setHeader(header);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
index 928c55f..8dfd087 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
@@ -29,6 +29,8 @@ public class Peer {
// so just adding a volatile is enough to face the concurrency problem
private volatile boolean isCatchUp;
private AtomicInteger inconsistentHeartbeatNum = new AtomicInteger();
+ // lastLogIndex from the last heartbeat
+ private long lastHeartBeatIndex;
public Peer(long nextIndex) {
this.nextIndex = nextIndex;
@@ -69,4 +71,12 @@ public class Peer {
public void resetInconsistentHeartbeatNum() {
inconsistentHeartbeatNum.set(0);
}
+
+ public long getLastHeartBeatIndex() {
+ return lastHeartBeatIndex;
+ }
+
+ public void setLastHeartBeatIndex(long lastHeartBeatIndex) {
+ this.lastHeartBeatIndex = lastHeartBeatIndex;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 29b620c..fb83e69 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -69,6 +69,7 @@ public class HeartbeatHandler implements
AsyncMethodCallback<HeartBeatResponse>
.computeIfAbsent(follower, k -> new
Peer(localMember.getLogManager().getLastLogIndex()));
if (!peer.isCatchUp() || !localMember.getLogManager()
.isLogUpToDate(lastLogTerm, lastLogIdx)) {
+ // the follower is not up-to-date
peer.setNextIndex(lastLogIdx + 1);
logger.debug("{}: catching up node {}, index-term: {}-{}/{}-{}, peer
nextIndex {}, peer "
+ "match index {}",
@@ -77,14 +78,25 @@ public class HeartbeatHandler implements
AsyncMethodCallback<HeartBeatResponse>
localLastLogIdx, localLastLogTerm,
peer.getNextIndex(), peer.getMatchIndex());
- int inconsistentNum = peer.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 5) {
- localMember.catchUp(follower);
+ // only start a catch up when the follower's lastLogIndex remains
stall and unchanged for 5
+ // heartbeats
+ if (lastLogIdx == peer.getLastHeartBeatIndex()) {
+ // the follower's lastLogIndex is unchanged, increase inconsistent
counter
+ int inconsistentNum = peer.incInconsistentHeartbeatNum();
+ if (inconsistentNum >= 5) {
+ localMember.catchUp(follower);
+ }
+ } else {
+ // the follower's lastLogIndex is changed, which means the follower
is not down yet, we
+ // reset the counter to see if it can eventually catch up by itself
+ peer.resetInconsistentHeartbeatNum();
}
} else {
+ // the follower is up-to-date
peer.setMatchIndex(Math.max(peer.getMatchIndex(), lastLogIdx));
peer.resetInconsistentHeartbeatNum();
}
+ peer.setLastHeartBeatIndex(lastLogIdx);
} else {
// current leadership is invalid because the follower has a larger term
synchronized (localMember.getTerm()) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index fc7d725..170ccaa 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -27,7 +27,6 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -143,6 +142,7 @@ import
org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
+import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -182,6 +182,7 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -1915,15 +1916,32 @@ public class MetaGroupMember extends RaftMember {
return StatusUtils.TIME_OUT;
}
}
-
+ TSIService.Client cli;
+ long sId;
private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node
header) {
Client client = getSyncDataClient(receiver,
RaftServer.getWriteOperationTimeoutMS());
- try (ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
- plan.serialize(dataOutputStream);
+ try {
+
+// if (plan instanceof CreateTimeSeriesPlan) {
+// CreateTimeSeriesPlan timeSeriesPlan = (CreateTimeSeriesPlan) plan;
+// if (cli == null) {
+// TSocket tSocket = new TSocket("127.0.0.1", 6669);
+// tSocket.open();
+// cli = new TSIService.Client(new TBinaryProtocol(new
TFastFramedTransport(tSocket)));
+// TSOpenSessionReq tsOpenSessionReq = new TSOpenSessionReq(
+// TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+// tsOpenSessionReq.setUsername("root");
+// tsOpenSessionReq.setPassword("root");
+// TSOpenSessionResp tsOpenSessionResp =
cli.openSession(tsOpenSessionReq);
+// sId = tsOpenSessionResp.sessionId;
+// }
+// TSCreateTimeseriesReq re =
+// new TSCreateTimeseriesReq(sId,
timeSeriesPlan.getPath().getFullPath(), 0, 0, 0);
+// return cli.createTimeseries(re);
+// }
ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.setPlanBytes(byteArrayOutputStream.toByteArray());
+ req.setPlanBytes(PlanSerializer.instance.serialize(plan));
if (header != null) {
req.setHeader(header);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0cf0ff9..29d830b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.cluster.server.member;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -87,6 +85,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.IoTDBException;
@@ -1149,6 +1148,7 @@ public abstract class RaftMember {
}
}
+
/**
* Forward a non-query plan to "receiver" using "client".
*
@@ -1181,12 +1181,10 @@ public abstract class RaftMember {
private TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node
header) {
Client client = getSyncClient(receiver);
- try (ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
- plan.serialize(dataOutputStream);
+ try {
ExecutNonQueryReq req = new ExecutNonQueryReq();
- req.setPlanBytes(byteArrayOutputStream.toByteArray());
+ req.setPlanBytes(PlanSerializer.instance.serialize(plan));
if (header != null) {
req.setHeader(header);
}
@@ -1344,6 +1342,7 @@ public abstract class RaftMember {
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws
IOException {
// process the plan locally
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
+
TSStatus answer = executeNonQuery(plan);
logger.debug("{}: Received a plan {}, executed answer: {}", name, plan,
answer);
return answer;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PlanSerializer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PlanSerializer.java
new file mode 100644
index 0000000..2dda132
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PlanSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this work
for additional information regarding copyright ownership. The ASF licenses
this file to you under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain a copy
of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or ag [...]
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.CommonUtils;
+
+public class PlanSerializer {
+ private static final int DEFAULT_BAOS_SIZE = CommonUtils.getCpuCores() * 4;
+ private BlockingDeque<ByteArrayOutputStream> baosBlockingDeque = new
LinkedBlockingDeque<>();
+
+ public static PlanSerializer instance = new PlanSerializer();
+
+ private PlanSerializer() {
+ for (int i = 0; i < DEFAULT_BAOS_SIZE; i++) {
+ baosBlockingDeque.push(new ByteArrayOutputStream(4096));
+ }
+ }
+
+ public byte[] serialize(PhysicalPlan plan) throws IOException {
+ ByteArrayOutputStream poll = baosBlockingDeque.poll();
+ poll.reset();
+ DataOutputStream dataOutputStream = new DataOutputStream(poll);
+
+ try {
+ plan.serialize(dataOutputStream);
+ return poll.toByteArray();
+ } finally {
+ baosBlockingDeque.offer(poll);
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index d3f031a..79f587a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.tsfile.utils;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
@@ -33,6 +32,10 @@ public class PublicBAOS extends ByteArrayOutputStream {
super();
}
+ public PublicBAOS(int size) {
+ super(size);
+ }
+
/**
* get current all bytes data
*