This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bbe062515c Pipe: fix recovering huge entry from wal with wrong 
position bug & fix multi pipes sharing the same ByteBuffer (#11285)
5bbe062515c is described below

commit 5bbe062515cad88d98e0c47bcfb22178041f6cd6
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 12 16:17:28 2023 +0800

    Pipe: fix recovering huge entry from wal with wrong position bug & fix 
multi pipes sharing the same ByteBuffer (#11285)
---
 .../dataregion/wal/buffer/WALBuffer.java           |  30 +++-
 .../storageengine/dataregion/wal/node/WALNode.java |   5 +
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  19 ++-
 .../wal/utils/WALInsertNodeCacheTest.java          | 155 +++++++++++++++------
 4 files changed, 165 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index a87ee1422d0..0856e4622ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
@@ -73,6 +74,8 @@ public class WALBuffer extends AbstractWALBuffer {
   private final Lock buffersLock = new ReentrantLock();
   // condition to guarantee correctness of switching buffers
   private final Condition idleBufferReadyCondition = 
buffersLock.newCondition();
+  // last writer position when fsync is called, help record each entry's 
position
+  private long lastFsyncPosition;
   // region these variables should be protected by buffersLock
   /** two buffers switch between three statuses (there is always 1 buffer 
working). */
   // buffer in working status, only updated by serializeThread
@@ -126,6 +129,30 @@ public class WALBuffer extends AbstractWALBuffer {
     }
   }
 
+  @TestOnly
+  public void setBufferSize(int size) {
+    buffersLock.lock();
+    try {
+      if (workingBuffer != null) {
+        MmapUtil.clean((MappedByteBuffer) workingBuffer);
+      }
+      if (idleBuffer != null) {
+        MmapUtil.clean((MappedByteBuffer) workingBuffer);
+      }
+      if (syncingBuffer != null) {
+        MmapUtil.clean((MappedByteBuffer) syncingBuffer);
+      }
+      workingBuffer = ByteBuffer.allocateDirect(size / 2);
+      idleBuffer = ByteBuffer.allocateDirect(size / 2);
+    } catch (OutOfMemoryError e) {
+      logger.error("Fail to allocate wal node-{}'s buffer because out of 
memory.", identifier, e);
+      close();
+      throw e;
+    } finally {
+      buffersLock.unlock();
+    }
+  }
+
   @Override
   public void write(WALEntry walEntry) {
     if (isClosed) {
@@ -446,7 +473,6 @@ public class WALBuffer extends AbstractWALBuffer {
     public void run() {
       final long startTime = System.nanoTime();
       long walFileVersionId = currentWALFileVersion;
-      long position = currentWALFileWriter.size();
       currentWALFileWriter.updateFileStatus(fileStatus);
 
       // calculate buffer used ratio
@@ -504,6 +530,7 @@ public class WALBuffer extends AbstractWALBuffer {
 
       // notify all waiting listeners
       if (forceSuccess) {
+        long position = lastFsyncPosition;
         for (WALFlushListener fsyncListener : info.fsyncListeners) {
           fsyncListener.succeed();
           if (fsyncListener.getWalEntryHandler() != null) {
@@ -511,6 +538,7 @@ public class WALBuffer extends AbstractWALBuffer {
             position += fsyncListener.getWalEntryHandler().getSize();
           }
         }
+        lastFsyncPosition = currentWALFileWriter.size();
       }
       WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
       WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, 
forceFlag);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 1bfa5dd7b06..546638c9f99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -939,4 +939,9 @@ public class WALNode implements IWALNode {
   CheckpointManager getCheckpointManager() {
     return checkpointManager;
   }
+
+  @TestOnly
+  public void setBufferSize(int size) {
+    buffer.setBufferSize(size);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 04fc125fdf7..cc9ab8effc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -54,7 +54,7 @@ public class WALInsertNodeCache {
 
   // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> 
lruCache;
-  private final boolean isBatchLoadEnabled;
+  private boolean isBatchLoadEnabled;
 
   // ids of all pinned memTables
   private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
@@ -77,6 +77,11 @@ public class WALInsertNodeCache {
     return isBatchLoadEnabled;
   }
 
+  @TestOnly
+  public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
+    this.isBatchLoadEnabled = isBatchLoadEnabled;
+  }
+
   public InsertNode getInsertNode(WALEntryPosition position) {
     final Pair<ByteBuffer, InsertNode> pair =
         isBatchLoadEnabled
@@ -88,7 +93,17 @@ public class WALInsertNodeCache {
     }
 
     if (pair.getRight() == null) {
-      pair.setRight(parse(pair.getLeft()));
+      try {
+        pair.setRight(parse(ByteBuffer.wrap(pair.getLeft().array())));
+      } catch (Exception e) {
+        logger.error(
+            "Parsing failed when recovering insertNode from wal, walFile:{}, 
position:{}, size:{}, exception:",
+            position.getWalFile(),
+            position.getPosition(),
+            position.getSize(),
+            e);
+        throw e;
+      }
     }
 
     return pair.getRight();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index 18817591195..bcec9915996 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -34,10 +34,15 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -75,62 +80,130 @@ public class WALInsertNodeCacheTest {
   }
 
   @Test
-  public void testLoadUnsealedWALFile() throws Exception {
+  public void testLoadAfterSyncBuffer() throws IllegalPathException {
+    try {
+      // Limit the wal buffer size to trigger sync Buffer when writing wal 
entry
+      walNode.setBufferSize(16);
+      // write memTable
+      IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+      walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+      InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+      node1.setSearchIndex(1);
+      WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), 
node1);
+      WALEntryPosition position = 
flushListener.getWalEntryHandler().getWalEntryPosition();
+      // wait until wal flushed
+      walNode.rollWALFile();
+      Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && 
position.canRead());
+      // load by cache
+      System.out.println(position.getPosition());
+      assertEquals(node1, cache.getInsertNode(position));
+    } finally {
+      walNode.setBufferSize(config.getWalBufferSize());
+    }
+  }
+
+  @Test
+  public void testGetInsertNodeInParallel() throws IllegalPathException {
+    // write memTable
     IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
     walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
-    InsertRowNode node1 = getInsertRowNode(devicePath, 
System.currentTimeMillis());
+    InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
     node1.setSearchIndex(1);
     WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), 
node1);
     WALEntryPosition position = 
flushListener.getWalEntryHandler().getWalEntryPosition();
     // wait until wal flushed
-    while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
-      Thread.sleep(50);
+    walNode.rollWALFile();
+    Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && 
position.canRead());
+    // Test getInsertNode in parallel to detect buffer concurrent problem
+    AtomicBoolean failure = new AtomicBoolean(false);
+    List<Thread> threadList = new ArrayList<>(5);
+    for (int i = 0; i < 5; ++i) {
+      Thread getInsertNodeThread =
+          new Thread(
+              () -> {
+                try {
+                  assertEquals(node1, cache.getInsertNode(position));
+                } catch (Throwable e) {
+                  failure.set(true);
+                }
+              });
+      threadList.add(getInsertNodeThread);
+      getInsertNodeThread.start();
     }
+    Awaitility.await()
+        .until(
+            () -> {
+              for (Thread thread : threadList) {
+                if (thread.isAlive()) {
+                  return false;
+                }
+              }
+              return true;
+            });
+    assertFalse(failure.get());
+  }
+
+  @Test
+  public void testLoadUnsealedWALFile() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+    InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+    node1.setSearchIndex(1);
+    WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), 
node1);
+    WALEntryPosition position = 
flushListener.getWalEntryHandler().getWalEntryPosition();
+    // wait until wal flushed
+    Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && 
position.canRead());
     // load by cache
     assertEquals(node1, cache.getInsertNode(position));
   }
 
   @Test
   public void testBatchLoad() throws Exception {
-    // write memTable1
-    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
-    walNode.onMemTableCreated(memTable1, logDirectory + "/" + "fake1.tsfile");
-    InsertRowNode node1 = getInsertRowNode(devicePath, 
System.currentTimeMillis());
-    node1.setSearchIndex(1);
-    WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), 
node1);
-    WALEntryPosition position1 = 
flushListener1.getWalEntryHandler().getWalEntryPosition();
-    InsertRowNode node2 = getInsertRowNode(devicePath, 
System.currentTimeMillis());
-    node1.setSearchIndex(2);
-    WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), 
node2);
-    WALEntryPosition position2 = 
flushListener2.getWalEntryHandler().getWalEntryPosition();
-    // write memTable2
-    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
-    walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
-    InsertRowNode node3 = getInsertRowNode(devicePath, 
System.currentTimeMillis());
-    node1.setSearchIndex(3);
-    WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), 
node3);
-    WALEntryPosition position3 = 
flushListener3.getWalEntryHandler().getWalEntryPosition();
-    // wait until wal flushed
-    walNode.rollWALFile();
-    while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {
-      Thread.sleep(50);
+    // Enable batch load
+    boolean oldIsBatchLoadEnabled = 
WALInsertNodeCache.getInstance().isBatchLoadEnabled();
+    WALInsertNodeCache.getInstance().setIsBatchLoadEnabled(true);
+    try {
+      // write memTable1
+      IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+      walNode.onMemTableCreated(memTable1, logDirectory + "/" + 
"fake1.tsfile");
+      InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
+      node1.setSearchIndex(1);
+      WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), 
node1);
+      WALEntryPosition position1 = 
flushListener1.getWalEntryHandler().getWalEntryPosition();
+      InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
+      node1.setSearchIndex(2);
+      WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), 
node2);
+      WALEntryPosition position2 = 
flushListener2.getWalEntryHandler().getWalEntryPosition();
+      // write memTable2
+      IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+      walNode.onMemTableCreated(memTable2, logDirectory + "/" + 
"fake2.tsfile");
+      InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
+      node1.setSearchIndex(3);
+      WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), 
node3);
+      WALEntryPosition position3 = 
flushListener3.getWalEntryHandler().getWalEntryPosition();
+      // wait until wal flushed
+      walNode.rollWALFile();
+      Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && 
position3.canRead());
+      // check batch load memTable1
+      cache.addMemTable(memTable1.getMemTableId());
+      assertEquals(node1, cache.getInsertNode(position1));
+      assertTrue(cache.contains(position1));
+      assertEquals(
+          WALInsertNodeCache.getInstance().isBatchLoadEnabled(), 
cache.contains(position2));
+      assertFalse(cache.contains(position3));
+      // check batch load none
+      cache.removeMemTable(memTable1.getMemTableId());
+      cache.clear();
+      assertEquals(node1, cache.getInsertNode(position1));
+      assertTrue(cache.contains(position1));
+      assertFalse(cache.contains(position2));
+      assertFalse(cache.contains(position3));
+    } finally {
+      
WALInsertNodeCache.getInstance().setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
     }
-    // check batch load memTable1
-    cache.addMemTable(memTable1.getMemTableId());
-    assertEquals(node1, cache.getInsertNode(position1));
-    assertTrue(cache.contains(position1));
-    assertEquals(WALInsertNodeCache.getInstance().isBatchLoadEnabled(), 
cache.contains(position2));
-    assertFalse(cache.contains(position3));
-    // check batch load none
-    cache.removeMemTable(memTable1.getMemTableId());
-    cache.clear();
-    assertEquals(node1, cache.getInsertNode(position1));
-    assertTrue(cache.contains(position1));
-    assertFalse(cache.contains(position2));
-    assertFalse(cache.contains(position3));
   }
 
-  private InsertRowNode getInsertRowNode(String devicePath, long time) throws 
IllegalPathException {
+  private InsertRowNode getInsertRowNode(long time) throws 
IllegalPathException {
     TSDataType[] dataTypes =
         new TSDataType[] {
           TSDataType.DOUBLE,
@@ -152,7 +225,7 @@ public class WALInsertNodeCacheTest {
     InsertRowNode node =
         new InsertRowNode(
             new PlanNodeId(""),
-            new PartialPath(devicePath),
+            new PartialPath(WALInsertNodeCacheTest.devicePath),
             false,
             new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
             dataTypes,

Reply via email to