This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5bbe062515c Pipe: fix recovering huge entry from wal with wrong
position bug & fix multi pipes sharing the same ByteBuffer (#11285)
5bbe062515c is described below
commit 5bbe062515cad88d98e0c47bcfb22178041f6cd6
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 12 16:17:28 2023 +0800
Pipe: fix recovering huge entry from wal with wrong position bug & fix
multi pipes sharing the same ByteBuffer (#11285)
---
.../dataregion/wal/buffer/WALBuffer.java | 30 +++-
.../storageengine/dataregion/wal/node/WALNode.java | 5 +
.../dataregion/wal/utils/WALInsertNodeCache.java | 19 ++-
.../wal/utils/WALInsertNodeCacheTest.java | 155 +++++++++++++++------
4 files changed, 165 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index a87ee1422d0..0856e4622ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
@@ -73,6 +74,8 @@ public class WALBuffer extends AbstractWALBuffer {
private final Lock buffersLock = new ReentrantLock();
// condition to guarantee correctness of switching buffers
private final Condition idleBufferReadyCondition =
buffersLock.newCondition();
+ // last writer position when fsync is called, help record each entry's
position
+ private long lastFsyncPosition;
// region these variables should be protected by buffersLock
/** two buffers switch between three statuses (there is always 1 buffer
working). */
// buffer in working status, only updated by serializeThread
@@ -126,6 +129,30 @@ public class WALBuffer extends AbstractWALBuffer {
}
}
+ @TestOnly
+ public void setBufferSize(int size) {
+ buffersLock.lock();
+ try {
+ if (workingBuffer != null) {
+ MmapUtil.clean((MappedByteBuffer) workingBuffer);
+ }
+ if (idleBuffer != null) {
+ MmapUtil.clean((MappedByteBuffer) workingBuffer);
+ }
+ if (syncingBuffer != null) {
+ MmapUtil.clean((MappedByteBuffer) syncingBuffer);
+ }
+ workingBuffer = ByteBuffer.allocateDirect(size / 2);
+ idleBuffer = ByteBuffer.allocateDirect(size / 2);
+ } catch (OutOfMemoryError e) {
+ logger.error("Fail to allocate wal node-{}'s buffer because out of
memory.", identifier, e);
+ close();
+ throw e;
+ } finally {
+ buffersLock.unlock();
+ }
+ }
+
@Override
public void write(WALEntry walEntry) {
if (isClosed) {
@@ -446,7 +473,6 @@ public class WALBuffer extends AbstractWALBuffer {
public void run() {
final long startTime = System.nanoTime();
long walFileVersionId = currentWALFileVersion;
- long position = currentWALFileWriter.size();
currentWALFileWriter.updateFileStatus(fileStatus);
// calculate buffer used ratio
@@ -504,6 +530,7 @@ public class WALBuffer extends AbstractWALBuffer {
// notify all waiting listeners
if (forceSuccess) {
+ long position = lastFsyncPosition;
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
if (fsyncListener.getWalEntryHandler() != null) {
@@ -511,6 +538,7 @@ public class WALBuffer extends AbstractWALBuffer {
position += fsyncListener.getWalEntryHandler().getSize();
}
}
+ lastFsyncPosition = currentWALFileWriter.size();
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime,
forceFlag);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 1bfa5dd7b06..546638c9f99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -939,4 +939,9 @@ public class WALNode implements IWALNode {
CheckpointManager getCheckpointManager() {
return checkpointManager;
}
+
+ @TestOnly
+ public void setBufferSize(int size) {
+ buffer.setBufferSize(size);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 04fc125fdf7..cc9ab8effc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -54,7 +54,7 @@ public class WALInsertNodeCache {
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
- private final boolean isBatchLoadEnabled;
+ private boolean isBatchLoadEnabled;
// ids of all pinned memTables
private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
@@ -77,6 +77,11 @@ public class WALInsertNodeCache {
return isBatchLoadEnabled;
}
+ @TestOnly
+ public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
+ this.isBatchLoadEnabled = isBatchLoadEnabled;
+ }
+
public InsertNode getInsertNode(WALEntryPosition position) {
final Pair<ByteBuffer, InsertNode> pair =
isBatchLoadEnabled
@@ -88,7 +93,17 @@ public class WALInsertNodeCache {
}
if (pair.getRight() == null) {
- pair.setRight(parse(pair.getLeft()));
+ try {
+ pair.setRight(parse(ByteBuffer.wrap(pair.getLeft().array())));
+ } catch (Exception e) {
+ logger.error(
+ "Parsing failed when recovering insertNode from wal, walFile:{},
position:{}, size:{}, exception:",
+ position.getWalFile(),
+ position.getPosition(),
+ position.getSize(),
+ e);
+ throw e;
+ }
}
return pair.getRight();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index 18817591195..bcec9915996 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -34,10 +34,15 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -75,62 +80,130 @@ public class WALInsertNodeCacheTest {
}
@Test
- public void testLoadUnsealedWALFile() throws Exception {
+ public void testLoadAfterSyncBuffer() throws IllegalPathException {
+ try {
+ // Limit the wal buffer size to trigger sync Buffer when writing wal
entry
+ walNode.setBufferSize(16);
+ // write memTable
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
+ WALEntryPosition position =
flushListener.getWalEntryHandler().getWalEntryPosition();
+ // wait until wal flushed
+ walNode.rollWALFile();
+ Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position.canRead());
+ // load by cache
+ System.out.println(position.getPosition());
+ assertEquals(node1, cache.getInsertNode(position));
+ } finally {
+ walNode.setBufferSize(config.getWalBufferSize());
+ }
+ }
+
+ @Test
+ public void testGetInsertNodeInParallel() throws IllegalPathException {
+ // write memTable
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
- InsertRowNode node1 = getInsertRowNode(devicePath,
System.currentTimeMillis());
+ InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
WALEntryPosition position =
flushListener.getWalEntryHandler().getWalEntryPosition();
// wait until wal flushed
- while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
- Thread.sleep(50);
+ walNode.rollWALFile();
+ Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position.canRead());
+ // Test getInsertNode in parallel to detect buffer concurrent problem
+ AtomicBoolean failure = new AtomicBoolean(false);
+ List<Thread> threadList = new ArrayList<>(5);
+ for (int i = 0; i < 5; ++i) {
+ Thread getInsertNodeThread =
+ new Thread(
+ () -> {
+ try {
+ assertEquals(node1, cache.getInsertNode(position));
+ } catch (Throwable e) {
+ failure.set(true);
+ }
+ });
+ threadList.add(getInsertNodeThread);
+ getInsertNodeThread.start();
}
+ Awaitility.await()
+ .until(
+ () -> {
+ for (Thread thread : threadList) {
+ if (thread.isAlive()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ assertFalse(failure.get());
+ }
+
+ @Test
+ public void testLoadUnsealedWALFile() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
+ WALEntryPosition position =
flushListener.getWalEntryHandler().getWalEntryPosition();
+ // wait until wal flushed
+ Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position.canRead());
// load by cache
assertEquals(node1, cache.getInsertNode(position));
}
@Test
public void testBatchLoad() throws Exception {
- // write memTable1
- IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable1, logDirectory + "/" + "fake1.tsfile");
- InsertRowNode node1 = getInsertRowNode(devicePath,
System.currentTimeMillis());
- node1.setSearchIndex(1);
- WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(),
node1);
- WALEntryPosition position1 =
flushListener1.getWalEntryHandler().getWalEntryPosition();
- InsertRowNode node2 = getInsertRowNode(devicePath,
System.currentTimeMillis());
- node1.setSearchIndex(2);
- WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(),
node2);
- WALEntryPosition position2 =
flushListener2.getWalEntryHandler().getWalEntryPosition();
- // write memTable2
- IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
- InsertRowNode node3 = getInsertRowNode(devicePath,
System.currentTimeMillis());
- node1.setSearchIndex(3);
- WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(),
node3);
- WALEntryPosition position3 =
flushListener3.getWalEntryHandler().getWalEntryPosition();
- // wait until wal flushed
- walNode.rollWALFile();
- while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {
- Thread.sleep(50);
+ // Enable batch load
+ boolean oldIsBatchLoadEnabled =
WALInsertNodeCache.getInstance().isBatchLoadEnabled();
+ WALInsertNodeCache.getInstance().setIsBatchLoadEnabled(true);
+ try {
+ // write memTable1
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable1, logDirectory + "/" +
"fake1.tsfile");
+ InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(),
node1);
+ WALEntryPosition position1 =
flushListener1.getWalEntryHandler().getWalEntryPosition();
+ InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
+ node1.setSearchIndex(2);
+ WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(),
node2);
+ WALEntryPosition position2 =
flushListener2.getWalEntryHandler().getWalEntryPosition();
+ // write memTable2
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode.onMemTableCreated(memTable2, logDirectory + "/" +
"fake2.tsfile");
+ InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
+ node1.setSearchIndex(3);
+ WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(),
node3);
+ WALEntryPosition position3 =
flushListener3.getWalEntryHandler().getWalEntryPosition();
+ // wait until wal flushed
+ walNode.rollWALFile();
+ Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position3.canRead());
+ // check batch load memTable1
+ cache.addMemTable(memTable1.getMemTableId());
+ assertEquals(node1, cache.getInsertNode(position1));
+ assertTrue(cache.contains(position1));
+ assertEquals(
+ WALInsertNodeCache.getInstance().isBatchLoadEnabled(),
cache.contains(position2));
+ assertFalse(cache.contains(position3));
+ // check batch load none
+ cache.removeMemTable(memTable1.getMemTableId());
+ cache.clear();
+ assertEquals(node1, cache.getInsertNode(position1));
+ assertTrue(cache.contains(position1));
+ assertFalse(cache.contains(position2));
+ assertFalse(cache.contains(position3));
+ } finally {
+
WALInsertNodeCache.getInstance().setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
}
- // check batch load memTable1
- cache.addMemTable(memTable1.getMemTableId());
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertEquals(WALInsertNodeCache.getInstance().isBatchLoadEnabled(),
cache.contains(position2));
- assertFalse(cache.contains(position3));
- // check batch load none
- cache.removeMemTable(memTable1.getMemTableId());
- cache.clear();
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertFalse(cache.contains(position2));
- assertFalse(cache.contains(position3));
}
- private InsertRowNode getInsertRowNode(String devicePath, long time) throws
IllegalPathException {
+ private InsertRowNode getInsertRowNode(long time) throws
IllegalPathException {
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.DOUBLE,
@@ -152,7 +225,7 @@ public class WALInsertNodeCacheTest {
InsertRowNode node =
new InsertRowNode(
new PlanNodeId(""),
- new PartialPath(devicePath),
+ new PartialPath(WALInsertNodeCacheTest.devicePath),
false,
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes,