This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch testcontainer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7a26c92649e9e3487c10a9bf9e4540d40f3759a4 Author: HouliangQi <[email protected]> AuthorDate: Tue Apr 13 19:08:00 2021 +0800 Minor improve the concurrency problem caused by the checking apply thread and the compact entries thread (#3001) --- .../org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java | 4 ++-- .../iotdb/cluster/log/manage/CommittedEntryManager.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java index e16f99c..dcb287d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java @@ -266,7 +266,7 @@ public class LogCatchUpTask implements Callable<Boolean> { } // do append entries if (logger.isInfoEnabled()) { - logger.info("{}: sending {} logs to {}", raftMember.getName(), node, logList.size()); + logger.info("{}: sending {} logs to {}", raftMember.getName(), logList.size(), node); } if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { abort = !appendEntriesAsync(logList, request); @@ -274,7 +274,7 @@ public class LogCatchUpTask implements Callable<Boolean> { abort = !appendEntriesSync(logList, request); } if (!abort && logger.isInfoEnabled()) { - logger.info("{}: sent {} logs to {}", raftMember.getName(), node, logList.size()); + logger.info("{}: sent {} logs to {}", raftMember.getName(), logList.size(), node); } logList.clear(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java index c08334e..e86df85 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java @@ -208,13 +208,19 @@ public class CommittedEntryManager { throw new EntryUnavailableException(compactIndex, getLastIndex()); } int index = (int) (compactIndex - dummyIndex); + for (int i = 1; i <= index; i++) { + entryTotalMemSize -= entries.get(i).getByteSize(); + } + // The following two lines of code should be tightly linked, + // because the check apply thread will read the entry also, and there will be concurrency + // problems, + // but please rest assured that we have done concurrency security check in the check apply + // thread. + // They are put together just to reduce the probability of concurrency. entries.set( 0, new EmptyContentLog( entries.get(index).getCurrLogIndex(), entries.get(index).getCurrLogTerm())); - for (int i = 1; i <= index; i++) { - entryTotalMemSize -= entries.get(i).getByteSize(); - } entries.subList(1, index + 1).clear(); }
