This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch geely_1.0.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5efa3fa01c3e056d8d26fa20f123f2a2e4455ca Author: Alan Choo <[email protected]> AuthorDate: Sun Jan 29 16:09:39 2023 +0800 [IOTDB-5338] WAL buffer flush threshold optimaztion (#8832) --- .../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 58 +++++++++++++--------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java index c1ef5932e4..db2d627ef5 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java @@ -58,6 +58,7 @@ public class WALBuffer extends AbstractWALBuffer { private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2; + private static final double FSYNC_BUFFER_RATIO = 0.95; private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity(); /** whether close method is called */ @@ -142,9 +143,9 @@ public class WALBuffer extends AbstractWALBuffer { /** This task serializes WALEntry to workingBuffer and will call fsync at last. */ private class SerializeTask implements Runnable { - private final ByteBufferView byteBufferVew = new ByteBufferView(); + private final ByteBufferView byteBufferView = new ByteBufferView(); private final SerializeInfo info = new SerializeInfo(); - private int batchSize = 0; + private int totalSize = 0; @Override public void run() { @@ -172,20 +173,21 @@ public class WALBuffer extends AbstractWALBuffer { Thread.currentThread().interrupt(); } - // for better fsync performance, sleep a while to enlarge write batch - long fsyncDelay = config.getFsyncWalDelayInMs(); - if (fsyncDelay > 0) { + // try to get more WALEntries with blocking interface to enlarge write batch + while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) { + WALEntry walEntry = null; try { - Thread.sleep(fsyncDelay); + // for better fsync performance, wait a while to enlarge write batch + walEntry = walEntries.poll(config.getFsyncWalDelayInMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - logger.warn("Interrupted when sleeping a while to enlarge wal write batch."); + logger.warn( + "Interrupted when waiting for taking WALEntry from blocking queue to serialize."); Thread.currentThread().interrupt(); } - } - // try to get more WALEntries with non-blocking interface to enlarge write batch - while (walEntries.peek() != null && batchSize < QUEUE_CAPACITY) { - WALEntry walEntry = walEntries.poll(); + if (walEntry == null) { + break; + } boolean returnFlag = handleWALEntry(walEntry); if (returnFlag) { return; @@ -193,7 +195,7 @@ public class WALBuffer extends AbstractWALBuffer { } // call fsync at last and set fsyncListeners - if (batchSize > 0) { + if (totalSize > 0) { fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info); } } @@ -209,7 +211,6 @@ public class WALBuffer extends AbstractWALBuffer { boolean success = handleInfoEntry(walEntry); if (success) { - ++batchSize; info.fsyncListeners.add(walEntry.getWalFlushListener()); } return false; @@ -221,11 +222,10 @@ public class WALBuffer extends AbstractWALBuffer { * @return true if serialization is successful. */ private boolean handleInfoEntry(WALEntry walEntry) { - int size = byteBufferVew.position(); + int size = byteBufferView.position(); try { - walEntry.serialize(byteBufferVew); - size = byteBufferVew.position() - size; - logger.debug("wal entry size is: {}", size); + walEntry.serialize(byteBufferView); + size = byteBufferView.position() - size; } catch (Exception e) { logger.error( "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e); @@ -245,6 +245,7 @@ public class WALBuffer extends AbstractWALBuffer { currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX; } } + totalSize += size; info.metaData.add(size, searchIndex); return true; } @@ -256,16 +257,20 @@ public class WALBuffer extends AbstractWALBuffer { private boolean handleSignalEntry(WALSignalEntry walSignalEntry) { switch (walSignalEntry.getType()) { case ROLL_WAL_LOG_WRITER_SIGNAL: - logger.debug("Handle roll log writer signal for wal node-{}.", identifier); + if (logger.isDebugEnabled()) { + logger.debug("Handle roll log writer signal for wal node-{}.", identifier); + } info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener(); fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info); return true; case CLOSE_SIGNAL: - logger.debug( - "Handle close signal for wal node-{}, there are {} entries left.", - identifier, - walEntries.size()); - boolean dataExists = batchSize > 0; + if (logger.isDebugEnabled()) { + logger.debug( + "Handle close signal for wal node-{}, there are {} entries left.", + identifier, + walEntries.size()); + } + boolean dataExists = totalSize > 0; if (dataExists) { fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info); } @@ -420,6 +425,13 @@ public class WALBuffer extends AbstractWALBuffer { public void run() { currentWALFileWriter.updateFileStatus(fileStatus); + if (logger.isDebugEnabled()) { + double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity(); + logger.debug( + "Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%", + forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100); + } + // flush buffer to os try { currentWALFileWriter.write(syncingBuffer, info.metaData);
