This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new fbba351 fix peer null && fix wrong create timeseries plan
serialization
new 67ce3d9 Merge pull request #1171 from
LebronAl/cluster_new_wrong_serialization
fbba351 is described below
commit fbba351743c84aea6c24f273792948a120738c8b
Author: LebronAl <[email protected]>
AuthorDate: Fri May 8 14:28:54 2020 +0800
fix peer null && fix wrong create timeseries plan serialization
---
.../iotdb/cluster/server/member/RaftMember.java | 17 ++--
.../cluster/log/logtypes/SerializeLogTest.java | 15 +++
.../cluster/server/member/MetaGroupMemberTest.java | 10 ++
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 111 +++++++++++----------
4 files changed, 93 insertions(+), 60 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5b56d14..bfd6db2 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -231,7 +231,7 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
// safety, otherwise it may come from an invalid leader and is not
committed
if (logManager.getCommitLogIndex() < request.getCommitLogIndex() &&
logManager.matchTerm(request.getCommitLogTerm(),
request.getCommitLogIndex())) {
- logger.info("{}: Committing to {}-{}, local: {}-{}, last: {}-{}",
name,
+ logger.info("{}: Committing to {}-{}, localCommit: {}-{},
localLast: {}-{}", name,
request.getCommitLogIndex(),
request.getCommitLogTerm(), logManager.getCommitLogIndex(),
logManager.getCommitLogTerm(), logManager.getLastLogIndex(),
@@ -409,7 +409,8 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
logs.add(log);
}
- response = appendEntries(request.prevLogIndex, request.prevLogTerm,
request.leaderCommit, logs);
+ response = appendEntries(request.prevLogIndex, request.prevLogTerm,
request.leaderCommit,
+ logs);
resultHandler.onComplete(response);
logger.debug("{} AppendEntriesRequest of log size {} completed", name,
request.getEntries().size());
@@ -426,7 +427,8 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
* @return Response.RESPONSE_AGREE when the log is successfully appended or
Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- private long appendEntries(long prevLogIndex, long prevLogTerm, long
leaderCommit, List<Log> logs) {
+ private long appendEntries(long prevLogIndex, long prevLogTerm, long
leaderCommit,
+ List<Log> logs) {
if (logs.isEmpty()) {
return Response.RESPONSE_AGREE;
}
@@ -549,12 +551,8 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
if (node.equals(thisNode)) {
continue;
}
- if (!peerMap.containsKey(node)) {
- logger.warn("{}'s peerMap lost the info of node {}, created a new
peer for using", name,
- node);
- peerMap.put(node, new Peer(logManager.getLastLogIndex()));
- }
- if (!peerMap.get(node).isCatchUp()) {
+ Peer peer = peerMap.computeIfAbsent(node, k -> new
Peer(logManager.getLastLogIndex()));
+ if (!peer.isCatchUp()) {
logger.warn("{} can't append log to node {} because it needs
catchUp", name, node);
continue;
}
@@ -565,6 +563,7 @@ public abstract class RaftMember implements
RaftService.AsyncIface {
handler.setVoteCounter(voteCounter);
handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
+ handler.setPeer(peer);
handler.setReceiverTerm(newLeaderTerm);
try {
client.appendEntry(request, handler);
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 1f76006..913317b 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -23,13 +23,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
@@ -60,6 +66,15 @@ public class SerializeLogTest {
byteBuffer = log.serialize();
logPrime = LogParser.getINSTANCE().parse(byteBuffer);
assertEquals(log, logPrime);
+
+ log = new PhysicalPlanLog(new CreateTimeSeriesPlan(new
Path("root.applyMeta"
+ + ".s1"), TSDataType.DOUBLE, TSEncoding.RLE, CompressionType.SNAPPY,
+ new HashMap<String, String>() {{
+ put("MAX_POINT_NUMBER", "100");
+ }}, Collections.emptyMap(), Collections.emptyMap(), null));
+ byteBuffer = log.serialize();
+ logPrime = LogParser.getINSTANCE().parse(byteBuffer);
+ assertEquals(log, logPrime);
}
@Test
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index f936710..957eaf2 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -70,6 +70,7 @@ import
org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.DataClusterServer;
+import org.apache.iotdb.cluster.server.Peer;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
@@ -392,10 +393,19 @@ public class MetaGroupMemberTest extends MemberTest {
try {
Thread.sleep(100);
dummyResponse.set(Response.RESPONSE_AGREE);
+ DataGroupMember member = testMetaMember.getLocalDataMember(
+
testMetaMember.getPartitionTable().routeToHeaderByTime(TestUtils.getTestSg(0),
0));
+ for (Peer peer : member.getPeerMap().values()) {
+ peer.setCatchUp(true);
+ }
+ for (Peer peer : testMetaMember.getPeerMap().values()) {
+ peer.setCatchUp(true);
+ }
} catch (InterruptedException e) {
// ignore
}
}).start();
+
testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index ce719b7..55a6bd3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.tsfile.utils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BINARY;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BOOLEAN;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.DOUBLE;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.FLOAT;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.INTEGER;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LONG;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
+import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,10 +34,17 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-
-import static
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.*;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
/**
* ConverterUtils is a utility class. It provide conversion between normal
datatype and byte array.
@@ -83,7 +93,7 @@ public class ReadWriteIOUtils {
* read bytes array in given size
*
* @param buffer buffer
- * @param size size
+ * @param size size
* @return bytes array
*/
public static byte[] readBytes(ByteBuffer buffer, int size) {
@@ -123,16 +133,16 @@ public class ReadWriteIOUtils {
public static int write(Map<String, String> map, DataOutputStream stream)
throws IOException {
int length = 0;
byte[] bytes;
- stream.write(map.size());
+ stream.writeInt(map.size());
length += 4;
for (Entry<String, String> entry : map.entrySet()) {
bytes = entry.getKey().getBytes();
- stream.write(bytes.length);
+ stream.writeInt(bytes.length);
length += 4;
stream.write(bytes);
length += bytes.length;
bytes = entry.getValue().getBytes();
- stream.write(bytes.length);
+ stream.writeInt(bytes.length);
length += 4;
stream.write(bytes);
length += bytes.length;
@@ -141,7 +151,6 @@ public class ReadWriteIOUtils {
}
-
/**
* write a int value to outputStream according to flag. If flag is true,
write 1, else write 0.
*/
@@ -224,7 +233,6 @@ public class ReadWriteIOUtils {
}
-
/**
* write the size (int) of the binary and then the bytes in binary
*/
@@ -329,7 +337,8 @@ public class ReadWriteIOUtils {
/**
* write byteBuffer.array to outputStream without capacity.
*/
- public static int writeWithoutSize(ByteBuffer byteBuffer, OutputStream
outputStream) throws IOException {
+ public static int writeWithoutSize(ByteBuffer byteBuffer, OutputStream
outputStream)
+ throws IOException {
byte[] bytes = byteBuffer.array();
outputStream.write(bytes);
return bytes.length;
@@ -624,7 +633,7 @@ public class ReadWriteIOUtils {
/**
* read bytes from byteBuffer, this method makes sure that you can read
length bytes or reach to
* the end of the buffer.
- *
+ * <p>
* read a int + buffer
*/
public static ByteBuffer readByteBufferWithSelfDescriptionLength(ByteBuffer
buffer) {
@@ -779,8 +788,8 @@ public class ReadWriteIOUtils {
/**
- * to check whether the byte buffer is reach the magic string
- * this method doesn't change the position of the byte buffer
+ * to check whether the byte buffer is reach the magic string this method
doesn't change the
+ * position of the byte buffer
*
* @param byteBuffer byte buffer
* @return whether the byte buffer is reach the magic string
@@ -793,8 +802,8 @@ public class ReadWriteIOUtils {
}
/**
- * to check whether the inputStream is reach the magic string
- * this method doesn't change the position of the inputStream
+ * to check whether the inputStream is reach the magic string this method
doesn't change the
+ * position of the inputStream
*
* @param inputStream inputStream
* @return whether the inputStream is reach the magic string
@@ -808,38 +817,38 @@ public class ReadWriteIOUtils {
}
public static void writeObject(Object value, DataOutputStream outputStream) {
- try {
- if (value instanceof Long) {
- outputStream.write(LONG.ordinal());
- outputStream.writeLong((Long) value);
- } else if (value instanceof Double) {
- outputStream.write(DOUBLE.ordinal());
- outputStream.writeDouble((Double) value);
- } else if (value instanceof Integer) {
- outputStream.write(INTEGER.ordinal());
- outputStream.writeInt((Integer) value);
- } else if (value instanceof Float) {
- outputStream.write(FLOAT.ordinal());
- outputStream.writeFloat((Float) value);
- } else if (value instanceof Binary) {
- outputStream.write(BINARY.ordinal());
- byte[] bytes = ((Binary) value).getValues();
- outputStream.writeInt(bytes.length);
- outputStream.write(bytes);
- } else if (value instanceof Boolean) {
- outputStream.write(BOOLEAN.ordinal());
- outputStream.write(((Boolean) value) ? 1 : 0);
- } else if (value == null) {
- outputStream.write(NULL.ordinal());
- } else {
- outputStream.write(STRING.ordinal());
- byte[] bytes = value.toString().getBytes();
- outputStream.writeInt(bytes.length);
- outputStream.write(bytes);
- }
- } catch (IOException ignored) {
- // ignored
+ try {
+ if (value instanceof Long) {
+ outputStream.write(LONG.ordinal());
+ outputStream.writeLong((Long) value);
+ } else if (value instanceof Double) {
+ outputStream.write(DOUBLE.ordinal());
+ outputStream.writeDouble((Double) value);
+ } else if (value instanceof Integer) {
+ outputStream.write(INTEGER.ordinal());
+ outputStream.writeInt((Integer) value);
+ } else if (value instanceof Float) {
+ outputStream.write(FLOAT.ordinal());
+ outputStream.writeFloat((Float) value);
+ } else if (value instanceof Binary) {
+ outputStream.write(BINARY.ordinal());
+ byte[] bytes = ((Binary) value).getValues();
+ outputStream.writeInt(bytes.length);
+ outputStream.write(bytes);
+ } else if (value instanceof Boolean) {
+ outputStream.write(BOOLEAN.ordinal());
+ outputStream.write(((Boolean) value) ? 1 : 0);
+ } else if (value == null) {
+ outputStream.write(NULL.ordinal());
+ } else {
+ outputStream.write(STRING.ordinal());
+ byte[] bytes = value.toString().getBytes();
+ outputStream.writeInt(bytes.length);
+ outputStream.write(bytes);
}
+ } catch (IOException ignored) {
+ // ignored
+ }
}
public static Object readObject(ByteBuffer buffer) {