This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new e02917b  fix synchronized size check
e02917b is described below

commit e02917bfbe9278db818c65850ea6d66de4bc9726
Author: jt <[email protected]>
AuthorDate: Mon Oct 18 09:36:35 2021 +0800

    fix synchronized size check
---
 cluster/distribute-dc.sh                           |  2 +-
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 14 +++++-
 .../org/apache/iotdb/cluster/expr/ExprMember.java  | 55 +++++++++++++++++----
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |  3 +-
 .../apache/iotdb/cluster/expr/VotingLogList.java   |  1 +
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  5 +-
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  1 +
 .../handlers/caller/AppendNodeEntryHandler.java    |  3 +-
 .../handlers/caller/LogCatchUpInBatchHandler.java  |  6 ++-
 .../iotdb/cluster/server/member/RaftMember.java    | 57 +++++++++++++---------
 .../apache/iotdb/cluster/server/monitor/Timer.java |  9 +++-
 influxdb-protocol/pom.xml                          |  4 +-
 12 files changed, 113 insertions(+), 47 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 01b5242..279f9ec 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,4 +1,4 @@
-src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/*
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
 
 ips=(dc15 dc16 dc17 dc18)
 target_lib_path=/home/jt/iotdb_expr/lib
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 34eb53f..86ec5e6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class ExprBench {
 
   private AtomicLong requestCounter = new AtomicLong();
+  private AtomicLong latencySum = new AtomicLong();
+  private long maxLatency = 0;
   private int threadNum = 64;
   private int workloadSize = 64 * 1024;
   private SyncClientPool clientPool;
@@ -65,9 +67,15 @@ public class ExprBench {
                 long currRequsetNum = -1;
                 while (true) {
 
+                  long reqLatency = System.nanoTime();
                   try {
                     client.executeNonQueryPlan(request);
                     currRequsetNum = requestCounter.incrementAndGet();
+                    if (currRequsetNum > threadNum * 10) {
+                      reqLatency = System.nanoTime() - reqLatency;
+                      maxLatency = Math.max(maxLatency, reqLatency);
+                      latencySum.addAndGet(reqLatency);
+                    }
                   } catch (TException e) {
                     e.printStackTrace();
                   }
@@ -76,11 +84,13 @@ public class ExprBench {
                     long elapsedTime = System.currentTimeMillis() - startTime;
                     System.out.println(
                         String.format(
-                            "%d %d %f(%f)",
+                            "%d %d %f(%f) %f %f",
                             elapsedTime,
                             currRequsetNum,
                             (currRequsetNum + 0.0) / elapsedTime,
-                            currRequsetNum * workloadSize / (1024.0 * 1024.0) 
/ elapsedTime));
+                            currRequsetNum * workloadSize / (1024.0 * 1024.0) 
/ elapsedTime,
+                            maxLatency / 1000.0,
+                            (latencySum.get() + 0.0) / currRequsetNum));
                   }
 
                   if (currRequsetNum >= maxRequestNum) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 56b2d08..9e07028 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -59,10 +59,11 @@ public class ExprMember extends MetaGroupMember {
   public static boolean bypassRaft = false;
   public static boolean useSlidingWindow = false;
 
-  private int windowSize = 10000;
-  private Log[] logWindow = new Log[windowSize];
+  private int windowCapacity = 10000;
+  private int windowLength = 0;
+  private Log[] logWindow = new Log[windowCapacity];
   private long firstPosPrevIndex = 0;
-  private long[] prevTerms = new long[windowSize];
+  private long[] prevTerms = new long[windowCapacity];
 
   private ExecutorService bypassPool;
 
@@ -163,16 +164,19 @@ public class ExprMember extends MetaGroupMember {
     // check the next entry
     Log log = logWindow[pos];
     boolean nextMismatch = false;
-    if (pos < windowSize - 1) {
+    if (pos < windowCapacity - 1) {
       long nextPrevTerm = prevTerms[pos + 1];
       if (nextPrevTerm != log.getCurrLogTerm()) {
         nextMismatch = true;
       }
     }
     if (nextMismatch) {
-      for (int i = pos + 1; i < windowSize; i++) {
+      for (int i = pos + 1; i < windowCapacity; i++) {
         if (logWindow[i] != null) {
           logWindow[i] = null;
+          if (i == windowLength - 1) {
+            windowLength = pos + 1;
+          }
         } else {
           break;
         }
@@ -193,7 +197,7 @@ public class ExprMember extends MetaGroupMember {
     long windowPrevLogTerm = prevTerms[0];
 
     int flushPos = 0;
-    for (; flushPos < windowSize; flushPos++) {
+    for (; flushPos < windowCapacity; flushPos++) {
       if (logWindow[flushPos] == null) {
         break;
       }
@@ -204,10 +208,10 @@ public class ExprMember extends MetaGroupMember {
     long success =
         logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, 
leaderCommit, logs);
     if (success != -1) {
-      System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - 
flushPos);
-      System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowSize - 
flushPos);
+      System.arraycopy(logWindow, flushPos, logWindow, 0, windowCapacity - 
flushPos);
+      System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowCapacity - 
flushPos);
       for (int i = 1; i <= flushPos; i++) {
-        logWindow[windowSize - i] = null;
+        logWindow[windowCapacity - i] = null;
       }
     }
     firstPosPrevIndex = logManager.getLastLogIndex();
@@ -217,6 +221,32 @@ public class ExprMember extends MetaGroupMember {
     return success;
   }
 
+  protected AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+    if (!useSlidingWindow) {
+      return super.appendEntries(prevLogIndex, prevLogTerm, leaderCommit, 
logs);
+    }
+
+    if (logs.isEmpty()) {
+      return new 
AppendEntryResult(Response.RESPONSE_AGREE).setHeader(getHeader());
+    }
+
+    AppendEntryResult result = null;
+    for (Log log : logs) {
+      result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+
+      if (result.status != Response.RESPONSE_AGREE
+          && result.status != Response.RESPONSE_STRONG_ACCEPT
+          && result.status != Response.RESPONSE_WEAK_ACCEPT) {
+        return result;
+      }
+      prevLogIndex = log.getCurrLogIndex();
+      prevLogTerm = log.getCurrLogTerm();
+    }
+
+    return result;
+  }
+
   protected AppendEntryResult appendEntry(
       long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     if (!useSlidingWindow) {
@@ -235,16 +265,21 @@ public class ExprMember extends MetaGroupMember {
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
-      } else if (windowPos < windowSize) {
+      } else if (windowPos < windowCapacity) {
         // the new entry falls into the window
         logWindow[windowPos] = log;
         prevTerms[windowPos] = prevLogTerm;
+        if (windowLength < windowPos + 1) {
+          windowLength = windowPos + 1;
+        }
         checkLog(windowPos);
         if (windowPos == 0) {
           appendedPos = flushWindow(result, leaderCommit);
         } else {
           result.status = Response.RESPONSE_WEAK_ACCEPT;
         }
+
+        Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
       } else {
         result.setStatus(Response.RESPONSE_LOG_MISMATCH);
         result.setHeader(getHeader());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index ae0516d..4381746 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -84,7 +84,8 @@ public class ExprServer extends MetaClusterServer {
     ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
     ClusterDescriptor.getInstance().getConfig().setInternalIp(ip);
     
ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
-    ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
+    
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
+    // 
ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
     RaftMember.USE_LOG_DISPATCHER = true;
     RaftMember.USE_INDIRECT_LOG_DISPATCHER = useIndirectDispatcher;
     LogDispatcher.bindingThreadNum = dispatcherThreadNum;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 804f255..149e726 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -75,6 +75,7 @@ public class VotingLogList {
       List<VotingLog> acceptedLogs = logList.subList(0, lastEntryIndexToCommit 
+ 1);
       for (VotingLog acceptedLog : acceptedLogs) {
         synchronized (acceptedLog) {
+          acceptedLog.acceptedTime = System.nanoTime();
           acceptedLog.notifyAll();
         }
       }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 7152990..d954ccf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -106,6 +106,7 @@ public class LogDispatcher {
                 return byteBuffer;
               });
     }
+
     for (int i = 0; i < nodeLogQueues.size(); i++) {
       BlockingQueue<SendLogRequest> nodeLogQueue = nodeLogQueues.get(i);
       try {
@@ -250,7 +251,9 @@ public class LogDispatcher {
           synchronized (logBlockingDeque) {
             SendLogRequest poll = logBlockingDeque.take();
             currBatch.add(poll);
-            logBlockingDeque.drainTo(currBatch);
+            if (useBatchInLogCatchUp) {
+              logBlockingDeque.drainTo(currBatch);
+            }
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index dc2d9b0..555562a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -26,6 +26,7 @@ public class VotingLog {
   protected Log log;
   protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
+  public long acceptedTime;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index dc98b62..f865ec8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -78,7 +78,8 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
       // the request already failed
       return;
     }
-    logger.debug("{}: Append response {} from {}", member.getName(), response, 
receiver);
+    logger.debug(
+        "{}: Append response {} from {} for {}", member.getName(), response, 
receiver, log);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
       return;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
index 8a979ac..42656d1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpInBatchHandler.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
 
 public class LogCatchUpInBatchHandler implements 
AsyncMethodCallback<AppendEntryResult> {
 
@@ -50,14 +52,14 @@ public class LogCatchUpInBatchHandler implements 
AsyncMethodCallback<AppendEntry
         "{}: Received a catch-up result size of {} from {}", memberName, 
logs.size(), follower);
 
     long resp = response.status;
-    if (resp == RESPONSE_AGREE) {
+    if (resp == RESPONSE_AGREE || resp == RESPONSE_STRONG_ACCEPT) {
       synchronized (appendSucceed) {
         appendSucceed.set(true);
         appendSucceed.notifyAll();
       }
       logger.debug("{}: Succeeded to send logs, size is {}", memberName, 
logs.size());
 
-    } else if (resp == RESPONSE_LOG_MISMATCH) {
+    } else if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_WEAK_ACCEPT) {
       // this is not probably possible
       logger.error(
           "{}: Log mismatch occurred when sending logs, whose size is {}", 
memberName, logs.size());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 61149d1..564eb98 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -786,7 +786,7 @@ public abstract class RaftMember {
 
   public void setAllNodes(PartitionGroup allNodes) {
     this.allNodes = allNodes;
-    this.votingLogList = new VotingLogList(allNodes.size() / 2 + 1);
+    this.votingLogList = new VotingLogList(allNodes.size() / 2);
   }
 
   public Map<Node, Long> getLastCatchUpResponseTime() {
@@ -1148,37 +1148,40 @@ public abstract class RaftMember {
     // assign term and index to the new log and append it
     SendLogRequest sendLogRequest;
 
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+    }
+
+    if (log.serialize().capacity() + Integer.BYTES
+        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) 
{
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
     long startTime =
         
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
-    Log log;
     synchronized (logManager) {
       
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
           startTime);
 
-      if (plan instanceof LogPlan) {
-        try {
-          log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
-        } catch (UnknownLogTypeException e) {
-          logger.error("Can not parse LogPlan {}", plan, e);
-          return StatusUtils.PARSE_LOG_ERROR;
-        }
-      } else {
-        log = new PhysicalPlanLog();
-        ((PhysicalPlanLog) log).setPlan(plan);
-        plan.setIndex(logManager.getLastLogIndex() + 1);
-      }
+      plan.setIndex(logManager.getLastLogIndex() + 1);
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
       startTime = 
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
       // just like processPlanLocally,we need to check the size of log
-      if (log.serialize().capacity() + Integer.BYTES
-          >= 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
-        logger.error(
-            "Log cannot fit into buffer, please increase raft_log_buffer_size;"
-                + "or reduce the size of requests you send.");
-        return StatusUtils.INTERNAL_ERROR;
-      }
+
       // logDispatcher will serialize log, and set log size, and we will use 
the size after it
       logManager.append(log);
       
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
@@ -1603,6 +1606,7 @@ public abstract class RaftMember {
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, 
int quorumSize) {
     // wait for the followers to vote
     long startTime = 
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
+    long nextTimeToPrint = 3000;
     synchronized (log) {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
@@ -1614,19 +1618,21 @@ public abstract class RaftMember {
           && (!ENABLE_WEAK_ACCEPTANCE
               || (stronglyAcceptedNodeNum + weaklyAcceptedNodeNum < 
quorumSize))) {
         try {
-          log.wait(1000);
+          log.wait(1);
+          logger.debug("{} ends waiting", log);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
         }
         alreadyWait = System.currentTimeMillis() - waitStart;
-        if (alreadyWait > 3000) {
+        if (alreadyWait > nextTimeToPrint) {
           logger.info(
               "Still not receive enough votes for {}, strongly accepted {}, 
weakly "
                   + "accepted {}",
               log,
               log.getStronglyAcceptedNodeIds(),
               log.getWeaklyAcceptedNodeIds());
+          nextTimeToPrint *= 2;
         }
         stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
         weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
@@ -1641,6 +1647,9 @@ public abstract class RaftMember {
             alreadyWait);
       }
     }
+    if (log.acceptedTime != 0) {
+      
Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime);
+    }
     
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
 
     // a node has a larger term than the local node, so this node is no longer 
a valid leader
@@ -2166,7 +2175,7 @@ public abstract class RaftMember {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or 
Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  private AppendEntryResult appendEntries(
+  protected AppendEntryResult appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
     logger.debug(
         "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 21f5ef8..c0afaa7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -243,18 +243,21 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
+    RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", 
TIME_SCALE, true, ROOT),
     RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
 
     String className;
     String blockName;
     AtomicLong sum = new AtomicLong(0);
     AtomicLong counter = new AtomicLong(0);
+    long max;
     double scale;
     boolean valid;
     int level;
     Statistic parent;
     List<Statistic> children = new ArrayList<>();
-    long warningThreshold = 3 * 1000 * 1000 * 1000L;
+    long warningThreshold = 30 * 1000 * 1000 * 1000L;
 
     Statistic(String className, String blockName, double scale, boolean valid, 
Statistic parent) {
       this.className = className;
@@ -274,6 +277,7 @@ public class Timer {
       if (ENABLE_INSTRUMENTING) {
         sum.addAndGet(val);
         counter.incrementAndGet();
+        max = Math.max(max, val);
       }
     }
 
@@ -308,6 +312,7 @@ public class Timer {
     public void reset() {
       sum.set(0);
       counter.set(0);
+      max = 0;
     }
 
     /** WARN: no current safety guarantee. */
@@ -322,7 +327,7 @@ public class Timer {
       double s = sum.get() / scale;
       long cnt = counter.get();
       double avg = s / cnt;
-      return String.format("%s - %s: %.2f, %d, %.2f", className, blockName, s, 
cnt, avg);
+      return String.format("%s - %s: %.2f, %d, %.2f, %d", className, 
blockName, s, cnt, avg, max);
     }
   }
 
diff --git a/influxdb-protocol/pom.xml b/influxdb-protocol/pom.xml
index 1d74bf1..caf4399 100644
--- a/influxdb-protocol/pom.xml
+++ b/influxdb-protocol/pom.xml
@@ -19,9 +19,7 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.iotdb</groupId>

Reply via email to