This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new 16d3ca4 [IOTDB-723] add raft log delete mechanism
new 30cbbd3 Merge pull request #1339 from neuyilan/cluster_new_IOTDB-723
16d3ca4 is described below
commit 16d3ca4e0ddf0aa29468f140cce780454bbaf400
Author: HouliangQi <[email protected]>
AuthorDate: Tue Jun 9 19:16:47 2020 +0800
[IOTDB-723] add raft log delete mechanism
---
.../resources/conf/iotdb-cluster.properties | 6 ++
.../apache/iotdb/cluster/config/ClusterConfig.java | 26 ++++++++
.../iotdb/cluster/config/ClusterDescriptor.java | 7 +++
.../cluster/log/manage/CommittedEntryManager.java | 9 +++
.../iotdb/cluster/log/manage/RaftLogManager.java | 60 +++++++++++++++++-
.../serializable/SyncLogDequeSerializer.java | 71 ++++++++++++++++++----
.../cluster/log/manage/RaftLogManagerTest.java | 48 ++++++++++++++-
7 files changed, 212 insertions(+), 15 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index ef7c26b..97fd987 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -61,3 +61,9 @@ MAX_REMOVED_LOG_SIZE=134217728
# whether to use batch append entries in log catch up
USE_BATCH_IN_CATCH_UP=true
+# max number of committed logs to be saved
+MAX_NUMBER_OF_LOGS=100
+
+# deletion check period of the submitted log
+LOG_DELETION_CHECK_INTERVAL_SECOND=3600
+
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 346c372..c0242f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -53,6 +53,16 @@ public class ClusterConfig {
private boolean useBatchInLogCatchUp = true;
+ /**
+ * max number of committed logs to be saved
+ */
+ private int maxNumberOfLogs = 100;
+
+ /**
+ * deletion check period of the submitted log
+ */
+ private int logDeleteCheckIntervalSecond = 3600;
+
public boolean isUseBatchInLogCatchUp() {
return useBatchInLogCatchUp;
}
@@ -148,4 +158,20 @@ public class ClusterConfig {
public void setQueryTimeoutInSec(int queryTimeoutInSec) {
this.queryTimeoutInSec = queryTimeoutInSec;
}
+
+ public int getMaxNumberOfLogs() {
+ return maxNumberOfLogs;
+ }
+
+ public void setMaxNumberOfLogs(int maxNumberOfLogs) {
+ this.maxNumberOfLogs = maxNumberOfLogs;
+ }
+
+ public int getLogDeleteCheckIntervalSecond() {
+ return logDeleteCheckIntervalSecond;
+ }
+
+ public void setLogDeleteCheckIntervalSecond(int
logDeleteCheckIntervalSecond) {
+ this.logDeleteCheckIntervalSecond = logDeleteCheckIntervalSecond;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 2ddab3d..6491d8d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -216,6 +216,13 @@ public class ClusterDescriptor {
config.setUseBatchInLogCatchUp(Boolean.parseBoolean(properties.getProperty(
"USE_BATCH_IN_CATCH_UP",
String.valueOf(config.isUseBatchInLogCatchUp()))));
+ config.setMaxNumberOfLogs(Integer.parseInt(
+ properties.getProperty("MAX_NUMBER_OF_LOGS",
String.valueOf(config.getMaxNumberOfLogs()))));
+
+ config.setLogDeleteCheckIntervalSecond(Integer.parseInt(properties
+ .getProperty("LOG_DELETION_CHECK_INTERVAL_SECOND",
+ String.valueOf(config.getLogDeleteCheckIntervalSecond()))));
+
String seedUrls = properties.getProperty("SEED_NODES");
if (seedUrls != null) {
List<String> urlList = getSeedUrlList(seedUrls);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index ea1166d..bab681d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -94,6 +94,15 @@ public class CommittedEntryManager {
}
/**
+ * Return the entries's size
+ *
+ * @return entries's size
+ */
+ public long getTotalSize() {
+ return getLastIndex() - getFirstIndex() + 1;
+ }
+
+ /**
* Return the entry's term for given index. Note that the called should
ensure index <=
* entries[entries.size()-1].index.
*
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 95fe0ad..f4c766a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -22,6 +22,11 @@ package org.apache.iotdb.cluster.log.manage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.EntryCompactedException;
import org.apache.iotdb.cluster.exception.EntryUnavailableException;
import org.apache.iotdb.cluster.exception.GetEntriesWrongParametersException;
@@ -50,6 +55,22 @@ public class RaftLogManager {
private long commitIndex;
private LogApplier logApplier;
+
+ private ScheduledExecutorService executorService;
+
+
+ /**
+ * max number of committed logs to be saved
+ */
+ private int maxNumberOfLogs =
ClusterDescriptor.getInstance().getConfig().getMaxNumberOfLogs();
+
+ /**
+ * deletion check period of the submitted log
+ */
+ private int logDeleteCheckIntervalSecond =
ClusterDescriptor.getInstance().getConfig()
+ .getLogDeleteCheckIntervalSecond();
+
+
public RaftLogManager(StableEntryManager stableEntryManager, LogApplier
applier) {
this.logApplier = applier;
this.setCommittedEntryManager(new CommittedEntryManager());
@@ -63,6 +84,14 @@ public class RaftLogManager {
this.setUnCommittedEntryManager(new UnCommittedEntryManager(last + 1));
// must have applied entry [compactIndex,last] to state machine
this.commitIndex = last;
+
+ executorService = new ScheduledThreadPoolExecutor(1,
+ new
BasicThreadFactory.Builder().namingPattern("raft-log-delete-%d").daemon(true)
+ .build());
+ executorService
+ .scheduleAtFixedRate(this::checkDeleteLog,
logDeleteCheckIntervalSecond,
+ logDeleteCheckIntervalSecond,
+ TimeUnit.SECONDS);
}
public Snapshot getSnapshot() {
@@ -364,7 +393,8 @@ public class RaftLogManager {
*
* @param newCommitIndex request commitIndex
*/
- public void commitTo(long newCommitIndex, boolean ignoreExecutionExceptions)
throws LogExecutionException {
+ public void commitTo(long newCommitIndex, boolean ignoreExecutionExceptions)
+ throws LogExecutionException {
if (commitIndex < newCommitIndex) {
long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
long hi = newCommitIndex + 1;
@@ -411,11 +441,12 @@ public class RaftLogManager {
/**
* Used by commitTo to apply newly committed entries
*
- * @param entries applying entries
+ * @param entries applying entries
* @param ignoreExecutionException when set to true, exceptions during
applying the logs are
* ignored, otherwise they are reported to
the upper level
*/
- protected void applyEntries(List<Log> entries, boolean
ignoreExecutionException) throws LogExecutionException {
+ protected void applyEntries(List<Log> entries, boolean
ignoreExecutionException)
+ throws LogExecutionException {
for (Log entry : entries) {
try {
logApplier.apply(entry);
@@ -488,6 +519,11 @@ public class RaftLogManager {
this.commitIndex = last;
}
+ @TestOnly
+ public void setMaxNumberOfLogs(int maxNumberOfLogs) {
+ this.maxNumberOfLogs = maxNumberOfLogs;
+ }
+
public void close() {
getStableEntryManager().close();
}
@@ -517,4 +553,22 @@ public class RaftLogManager {
public void setStableEntryManager(StableEntryManager stableEntryManager) {
this.stableEntryManager = stableEntryManager;
}
+
+
+ /**
+ * check whether delete the committed log
+ */
+ public void checkDeleteLog() {
+ if (committedEntryManager.getTotalSize() <= maxNumberOfLogs) {
+ return;
+ }
+ long removeSize = committedEntryManager.getTotalSize() - maxNumberOfLogs;
+ long compactIndex = committedEntryManager.getDummyIndex() + removeSize;
+ try {
+ getCommittedEntryManager().compactEntries(compactIndex);
+ getStableEntryManager().removeCompactedEntries(compactIndex);
+ } catch (EntryUnavailableException e) {
+ logger.error("regular compact log entries failed, error={}",
e.getMessage());
+ }
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index 7bf6e66..a6bd7c5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -36,6 +36,8 @@ import java.util.Date;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.HardState;
@@ -80,6 +82,12 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private VersionController versionController;
/**
+ * the lock uses when change the logSizeDeque
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+
+ /**
* for log tools
*
* @param logPath log dir path
@@ -126,10 +134,12 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
this.maxRemovedLogSize = maxRemovedLogSize;
}
+ @Override
public List<Log> getAllEntries() {
return recoverLog();
}
+ @Override
public void append(List<Log> entries) {
Log entry = entries.get(entries.size() - 1);
meta.setCommitLogIndex(entry.getCurrLogIndex());
@@ -139,17 +149,32 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
append(entries, meta);
}
+ @Override
public void setHardStateAndFlush(HardState state) {
this.state = state;
serializeMeta(meta);
}
+ @Override
public HardState getHardState() {
return state;
}
+ @Override
public void removeCompactedEntries(long index) {
- // TODO-Cluster: decide a log discarding policy
+ long distance = meta.getCommitLogIndex() - index;
+ if (distance <= 0) {
+ logger
+ .info("compact ({}) is out of bound lastIndex ({})", index,
meta.getCommitLogIndex());
+ return;
+ }
+ if (distance > logSizeDeque.size()) {
+ logger.info(
+ "entries before request index ({}) have been compacted", index);
+ }
+
+ int numToRemove = logSizeDeque.size() - (int) distance;
+ removeFirst(numToRemove);
}
public Deque<Integer> getLogSizeDeque() {
@@ -293,20 +318,32 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logger.error("Error in appending log {} ", log, e);
}
- logSizeDeque.addLast(totalSize);
+ lock.writeLock().lock();
+ try {
+ logSizeDeque.addLast(totalSize);
+ } finally {
+ lock.writeLock().unlock();
+ }
+
serializeMeta(meta);
+
}
public void append(List<Log> logs, LogManagerMeta meta) {
int bufferSize = 0;
List<ByteBuffer> bufferList = new ArrayList<>(logs.size());
- for (Log log : logs) {
- ByteBuffer data = log.serialize();
- int size = data.capacity() + Integer.BYTES;
- logSizeDeque.addLast(size);
- bufferSize += size;
+ lock.writeLock().lock();
+ try {
+ for (Log log : logs) {
+ ByteBuffer data = log.serialize();
+ int size = data.capacity() + Integer.BYTES;
+ logSizeDeque.addLast(size);
+ bufferSize += size;
- bufferList.add(data);
+ bufferList.add(data);
+ }
+ } finally {
+ lock.writeLock().unlock();
}
ByteBuffer finalBuffer = ByteBuffer.allocate(bufferSize);
@@ -343,9 +380,15 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
}
int size = 0;
- for (int i = 0; i < count; i++) {
- size += logSizeDeque.removeLast();
+ lock.writeLock().lock();
+ try {
+ for (int i = 0; i < count; i++) {
+ size += logSizeDeque.removeLast();
+ }
+ } finally {
+ lock.writeLock().unlock();
}
+
// truncate file
while (size > 0) {
File currentLogFile = getCurrentLogFile();
@@ -454,7 +497,12 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logger.error("Unknown log detected ", e);
}
- logSizeDeque.addLast(totalSize);
+ lock.writeLock().lock();
+ try {
+ logSizeDeque.addLast(totalSize);
+ } finally {
+ lock.writeLock().unlock();
+ }
return log;
}
@@ -527,6 +575,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logger.debug("Serialized log meta into {}", tempMetaFile.getPath());
}
+ @Override
public void close() {
try {
if (currentLogOutputStream != null) {
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
index bdea25e..a779b1e 100644
---
a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
+++
b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.EntryCompactedException;
import org.apache.iotdb.cluster.exception.EntryUnavailableException;
import org.apache.iotdb.cluster.exception.GetEntriesWrongParametersException;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
@@ -619,7 +620,8 @@ public class RaftLogManagerTest {
try {
List<Log> entries = instance.getEntries(1, Integer.MAX_VALUE);
assertEquals(test.testEntries, entries);
- assertEquals(test.testOffset,
instance.getUnCommittedEntryManager().getFirstUnCommittedIndex());
+ assertEquals(test.testOffset,
+ instance.getUnCommittedEntryManager().getFirstUnCommittedIndex());
} catch (Exception e) {
fail("An unexpected exception was thrown.");
} finally {
@@ -1011,4 +1013,48 @@ public class RaftLogManagerTest {
instance.close();
}
}
+
+ @Test
+ public void testCheckDeleteLog() {
+ SyncLogDequeSerializer syncLogDequeSerializer = new
SyncLogDequeSerializer(testIdentifier);
+ syncLogDequeSerializer.setMaxRemovedLogSize(10);
+
+ CommittedEntryManager committedEntryManager = new CommittedEntryManager();
+ RaftLogManager raftLogManager = new RaftLogManager(committedEntryManager,
+ syncLogDequeSerializer, logApplier);
+
+ int maxNumberOfLogs = 100;
+ List<Log> testLogs1;
+
+ raftLogManager.setMaxNumberOfLogs(maxNumberOfLogs);
+ testLogs1 = TestUtils.prepareNodeLogs(130);
+ raftLogManager.append(testLogs1);
+ try {
+ raftLogManager.commitTo(testLogs1.get(testLogs1.size() -
1).getCurrLogIndex(), false);
+ } catch (LogExecutionException e) {
+ assertEquals("why failed?", e.toString());
+ }
+
+ assertEquals(130, committedEntryManager.getTotalSize());
+ assertEquals(130, syncLogDequeSerializer.getLogSizeDeque().size());
+
+ raftLogManager.checkDeleteLog();
+
+ assertEquals(maxNumberOfLogs, committedEntryManager.getTotalSize());
+ assertEquals(maxNumberOfLogs,
syncLogDequeSerializer.getLogSizeDeque().size());
+
+ raftLogManager.close();
+
+ // recovery
+ syncLogDequeSerializer = new SyncLogDequeSerializer(testIdentifier);
+ try {
+ List<Log> logs = syncLogDequeSerializer.recoverLog();
+ assertEquals(maxNumberOfLogs, logs.size());
+ for (int i = 0; i < maxNumberOfLogs; i++) {
+ assertEquals(testLogs1.get(i + 30), logs.get(i));
+ }
+ } finally {
+ syncLogDequeSerializer.close();
+ }
+ }
}