This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new fbba351  fix peer null && fix wrong create timeseries plan 
serialization
     new 67ce3d9  Merge pull request #1171 from 
LebronAl/cluster_new_wrong_serialization
fbba351 is described below

commit fbba351743c84aea6c24f273792948a120738c8b
Author: LebronAl <[email protected]>
AuthorDate: Fri May 8 14:28:54 2020 +0800

    fix peer null && fix wrong create timeseries plan serialization
---
 .../iotdb/cluster/server/member/RaftMember.java    |  17 ++--
 .../cluster/log/logtypes/SerializeLogTest.java     |  15 +++
 .../cluster/server/member/MetaGroupMemberTest.java |  10 ++
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       | 111 +++++++++++----------
 4 files changed, 93 insertions(+), 60 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5b56d14..bfd6db2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -231,7 +231,7 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
           // safety, otherwise it may come from an invalid leader and is not 
committed
           if (logManager.getCommitLogIndex() < request.getCommitLogIndex() &&
               logManager.matchTerm(request.getCommitLogTerm(), 
request.getCommitLogIndex())) {
-            logger.info("{}: Committing to {}-{}, local: {}-{}, last: {}-{}", 
name,
+            logger.info("{}: Committing to {}-{}, localCommit: {}-{}, 
localLast: {}-{}", name,
                 request.getCommitLogIndex(),
                 request.getCommitLogTerm(), logManager.getCommitLogIndex(),
                 logManager.getCommitLogTerm(), logManager.getLastLogIndex(),
@@ -409,7 +409,8 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
         logs.add(log);
       }
 
-      response = appendEntries(request.prevLogIndex, request.prevLogTerm, 
request.leaderCommit, logs);
+      response = appendEntries(request.prevLogIndex, request.prevLogTerm, 
request.leaderCommit,
+          logs);
       resultHandler.onComplete(response);
       logger.debug("{} AppendEntriesRequest of log size {} completed", name,
           request.getEntries().size());
@@ -426,7 +427,8 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or 
Response
    * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  private long appendEntries(long prevLogIndex, long prevLogTerm, long 
leaderCommit, List<Log> logs) {
+  private long appendEntries(long prevLogIndex, long prevLogTerm, long 
leaderCommit,
+      List<Log> logs) {
     if (logs.isEmpty()) {
       return Response.RESPONSE_AGREE;
     }
@@ -549,12 +551,8 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
           if (node.equals(thisNode)) {
             continue;
           }
-          if (!peerMap.containsKey(node)) {
-            logger.warn("{}'s peerMap lost the info of node {}, created a new 
peer for using", name,
-                node);
-            peerMap.put(node, new Peer(logManager.getLastLogIndex()));
-          }
-          if (!peerMap.get(node).isCatchUp()) {
+          Peer peer = peerMap.computeIfAbsent(node, k -> new 
Peer(logManager.getLastLogIndex()));
+          if (!peer.isCatchUp()) {
             logger.warn("{} can't append log to node {} because it needs 
catchUp", name, node);
             continue;
           }
@@ -565,6 +563,7 @@ public abstract class RaftMember implements 
RaftService.AsyncIface {
             handler.setVoteCounter(voteCounter);
             handler.setLeaderShipStale(leaderShipStale);
             handler.setLog(log);
+            handler.setPeer(peer);
             handler.setReceiverTerm(newLeaderTerm);
             try {
               client.appendEntry(request, handler);
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 1f76006..913317b 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -23,13 +23,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.Test;
@@ -60,6 +66,15 @@ public class SerializeLogTest {
     byteBuffer = log.serialize();
     logPrime = LogParser.getINSTANCE().parse(byteBuffer);
     assertEquals(log, logPrime);
+
+    log = new PhysicalPlanLog(new CreateTimeSeriesPlan(new 
Path("root.applyMeta"
+        + ".s1"), TSDataType.DOUBLE, TSEncoding.RLE, CompressionType.SNAPPY,
+        new HashMap<String, String>() {{
+          put("MAX_POINT_NUMBER", "100");
+        }}, Collections.emptyMap(), Collections.emptyMap(), null));
+    byteBuffer = log.serialize();
+    logPrime = LogParser.getINSTANCE().parse(byteBuffer);
+    assertEquals(log, logPrime);
   }
 
   @Test
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index f936710..957eaf2 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -70,6 +70,7 @@ import 
org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.DataClusterServer;
+import org.apache.iotdb.cluster.server.Peer;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
@@ -392,10 +393,19 @@ public class MetaGroupMemberTest extends MemberTest {
         try {
           Thread.sleep(100);
           dummyResponse.set(Response.RESPONSE_AGREE);
+          DataGroupMember member = testMetaMember.getLocalDataMember(
+              
testMetaMember.getPartitionTable().routeToHeaderByTime(TestUtils.getTestSg(0), 
0));
+          for (Peer peer : member.getPeerMap().values()) {
+            peer.setCatchUp(true);
+          }
+          for (Peer peer : testMetaMember.getPeerMap().values()) {
+            peer.setCatchUp(true);
+          }
         } catch (InterruptedException e) {
           // ignore
         }
       }).start();
+
       testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
       assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index ce719b7..55a6bd3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.tsfile.utils;
 
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BINARY;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.BOOLEAN;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.DOUBLE;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.FLOAT;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.INTEGER;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LONG;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
+import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,10 +34,17 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-
-import static 
org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.*;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 /**
  * ConverterUtils is a utility class. It provide conversion between normal 
datatype and byte array.
@@ -83,7 +93,7 @@ public class ReadWriteIOUtils {
    * read bytes array in given size
    *
    * @param buffer buffer
-   * @param size size
+   * @param size   size
    * @return bytes array
    */
   public static byte[] readBytes(ByteBuffer buffer, int size) {
@@ -123,16 +133,16 @@ public class ReadWriteIOUtils {
   public static int write(Map<String, String> map, DataOutputStream stream) 
throws IOException {
     int length = 0;
     byte[] bytes;
-    stream.write(map.size());
+    stream.writeInt(map.size());
     length += 4;
     for (Entry<String, String> entry : map.entrySet()) {
       bytes = entry.getKey().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
       bytes = entry.getValue().getBytes();
-      stream.write(bytes.length);
+      stream.writeInt(bytes.length);
       length += 4;
       stream.write(bytes);
       length += bytes.length;
@@ -141,7 +151,6 @@ public class ReadWriteIOUtils {
   }
 
 
-
   /**
    * write a int value to outputStream according to flag. If flag is true, 
write 1, else write 0.
    */
@@ -224,7 +233,6 @@ public class ReadWriteIOUtils {
   }
 
 
-
   /**
    * write the size (int) of the binary and then the bytes in binary
    */
@@ -329,7 +337,8 @@ public class ReadWriteIOUtils {
   /**
    * write byteBuffer.array to outputStream without capacity.
    */
-  public static int writeWithoutSize(ByteBuffer byteBuffer, OutputStream 
outputStream) throws IOException {
+  public static int writeWithoutSize(ByteBuffer byteBuffer, OutputStream 
outputStream)
+      throws IOException {
     byte[] bytes = byteBuffer.array();
     outputStream.write(bytes);
     return bytes.length;
@@ -624,7 +633,7 @@ public class ReadWriteIOUtils {
   /**
    * read bytes from byteBuffer, this method makes sure that you can read 
length bytes or reach to
    * the end of the buffer.
-   *
+   * <p>
    * read a int + buffer
    */
   public static ByteBuffer readByteBufferWithSelfDescriptionLength(ByteBuffer 
buffer) {
@@ -779,8 +788,8 @@ public class ReadWriteIOUtils {
 
 
   /**
-   * to check whether the byte buffer is reach the magic string
-   * this method doesn't change the position of the byte buffer
+   * to check whether the byte buffer is reach the magic string this method 
doesn't change the
+   * position of the byte buffer
    *
    * @param byteBuffer byte buffer
    * @return whether the byte buffer is reach the magic string
@@ -793,8 +802,8 @@ public class ReadWriteIOUtils {
   }
 
   /**
-   * to check whether the inputStream is reach the magic string
-   * this method doesn't change the position of the inputStream
+   * to check whether the inputStream is reach the magic string this method 
doesn't change the
+   * position of the inputStream
    *
    * @param inputStream inputStream
    * @return whether the inputStream is reach the magic string
@@ -808,38 +817,38 @@ public class ReadWriteIOUtils {
   }
 
   public static void writeObject(Object value, DataOutputStream outputStream) {
-      try {
-        if (value instanceof Long) {
-          outputStream.write(LONG.ordinal());
-          outputStream.writeLong((Long) value);
-        } else if (value instanceof Double) {
-          outputStream.write(DOUBLE.ordinal());
-          outputStream.writeDouble((Double) value);
-        } else if (value instanceof Integer) {
-          outputStream.write(INTEGER.ordinal());
-          outputStream.writeInt((Integer) value);
-        } else if (value instanceof Float) {
-          outputStream.write(FLOAT.ordinal());
-          outputStream.writeFloat((Float) value);
-        } else if (value instanceof Binary) {
-          outputStream.write(BINARY.ordinal());
-          byte[] bytes = ((Binary) value).getValues();
-          outputStream.writeInt(bytes.length);
-          outputStream.write(bytes);
-        } else if (value instanceof Boolean) {
-          outputStream.write(BOOLEAN.ordinal());
-          outputStream.write(((Boolean) value) ? 1 : 0);
-        } else if (value == null) {
-          outputStream.write(NULL.ordinal());
-        } else {
-          outputStream.write(STRING.ordinal());
-          byte[] bytes = value.toString().getBytes();
-          outputStream.writeInt(bytes.length);
-          outputStream.write(bytes);
-        }
-      } catch (IOException ignored) {
-        // ignored
+    try {
+      if (value instanceof Long) {
+        outputStream.write(LONG.ordinal());
+        outputStream.writeLong((Long) value);
+      } else if (value instanceof Double) {
+        outputStream.write(DOUBLE.ordinal());
+        outputStream.writeDouble((Double) value);
+      } else if (value instanceof Integer) {
+        outputStream.write(INTEGER.ordinal());
+        outputStream.writeInt((Integer) value);
+      } else if (value instanceof Float) {
+        outputStream.write(FLOAT.ordinal());
+        outputStream.writeFloat((Float) value);
+      } else if (value instanceof Binary) {
+        outputStream.write(BINARY.ordinal());
+        byte[] bytes = ((Binary) value).getValues();
+        outputStream.writeInt(bytes.length);
+        outputStream.write(bytes);
+      } else if (value instanceof Boolean) {
+        outputStream.write(BOOLEAN.ordinal());
+        outputStream.write(((Boolean) value) ? 1 : 0);
+      } else if (value == null) {
+        outputStream.write(NULL.ordinal());
+      } else {
+        outputStream.write(STRING.ordinal());
+        byte[] bytes = value.toString().getBytes();
+        outputStream.writeInt(bytes.length);
+        outputStream.write(bytes);
       }
+    } catch (IOException ignored) {
+      // ignored
+    }
   }
 
   public static Object readObject(ByteBuffer buffer) {

Reply via email to