This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-wal-resource-management
by this push:
new 9c88dfcf04b rename
9c88dfcf04b is described below
commit 9c88dfcf04b8e86203fa2cc0ed937b7e39708a29
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu May 25 20:30:34 2023 +0800
rename
---
.../db/engine/storagegroup/TsFileProcessor.java | 4 +--
.../listener/PipeInsertionDataNodeListener.java | 6 ++---
.../core/event/impl/PipeTabletInsertionEvent.java | 22 ++++++++--------
.../realtime/PipeRealtimeCollectEventFactory.java | 6 ++---
.../db/pipe/resource/wal/PipeWALResource.java | 30 +++++++++++-----------
.../pipe/resource/wal/PipeWALResourceManager.java | 6 ++---
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 8 +++---
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 4 +--
.../{WALPipeHandler.java => WALEntryHandler.java} | 10 +++-----
.../db/wal/utils/listener/WALFlushListener.java | 10 ++++----
.../core/collector/PipeRealtimeCollectTest.java | 6 ++---
...peHandlerTest.java => WALEntryHandlerTest.java} | 18 ++++++-------
.../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 8 +++---
13 files changed, 68 insertions(+), 70 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c2a7f20973e..f89f28f9b2f 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -282,7 +282,7 @@ public class TsFileProcessor {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(),
- walFlushListener.getWalPipeHandler(),
+ walFlushListener.getWalEntryHandler(),
insertRowNode,
tsFileResource);
@@ -385,7 +385,7 @@ public class TsFileProcessor {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(),
- walFlushListener.getWalPipeHandler(),
+ walFlushListener.getWalEntryHandler(),
insertTabletNode,
tsFileResource);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index f2298310544..d39a2fb9dc4 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
import
org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -87,7 +87,7 @@ public class PipeInsertionDataNodeListener {
public void listenToInsertNode(
String dataRegionId,
- WALPipeHandler walPipeHandler,
+ WALEntryHandler walEntryHandler,
InsertNode insertNode,
TsFileResource tsFileResource) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
@@ -99,7 +99,7 @@ public class PipeInsertionDataNodeListener {
assigner.publishToAssign(
PipeRealtimeCollectEventFactory.createCollectEvent(
- walPipeHandler, insertNode, tsFileResource));
+ walEntryHandler, insertNode, tsFileResource));
}
/////////////////////////////// singleton ///////////////////////////////
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 476f9e6f12a..99d9da055ee 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.wal.exception.WALPipeException;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -39,14 +39,14 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
- private final WALPipeHandler walPipeHandler;
+ private final WALEntryHandler walEntryHandler;
- public PipeTabletInsertionEvent(WALPipeHandler walPipeHandler) {
- this.walPipeHandler = walPipeHandler;
+ public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler) {
+ this.walEntryHandler = walEntryHandler;
}
public InsertNode getInsertNode() throws WALPipeException {
- return walPipeHandler.getValue();
+ return walEntryHandler.getValue();
}
@Override
@@ -67,13 +67,13 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
@Override
public boolean increaseReferenceCount(String holderMessage) {
try {
- PipeResourceManager.wal().pin(walPipeHandler.getMemTableId(),
walPipeHandler);
+ PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(),
walEntryHandler);
return true;
} catch (Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for memtable %d error. Holder Message:
%s",
- walPipeHandler.getMemTableId(), holderMessage),
+ walEntryHandler.getMemTableId(), holderMessage),
e);
return false;
}
@@ -82,13 +82,13 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
@Override
public boolean decreaseReferenceCount(String holderMessage) {
try {
- PipeResourceManager.wal().unpin(walPipeHandler.getMemTableId());
+ PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
return true;
} catch (Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for memtable %d error. Holder Message:
%s",
- walPipeHandler.getMemTableId(), holderMessage),
+ walEntryHandler.getMemTableId(), holderMessage),
e);
return false;
}
@@ -96,11 +96,11 @@ public class PipeTabletInsertionEvent implements
TabletInsertionEvent, EnrichedE
@Override
public int getReferenceCount() {
- return
PipeResourceManager.wal().getReferenceCount(walPipeHandler.getMemTableId());
+ return
PipeResourceManager.wal().getReferenceCount(walEntryHandler.getMemTableId());
}
@Override
public String toString() {
- return "PipeTabletInsertionEvent{" + "walPipeHandler=" + walPipeHandler +
'}';
+ return "PipeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index a0961624dbe..a4b453e4322 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
public class PipeRealtimeCollectEventFactory {
@@ -35,9 +35,9 @@ public class PipeRealtimeCollectEventFactory {
}
public static PipeRealtimeCollectEvent createCollectEvent(
- WALPipeHandler walPipeHandler, InsertNode insertNode, TsFileResource
resource) {
+ WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
- new PipeTabletInsertionEvent(walPipeHandler), insertNode, resource);
+ new PipeTabletInsertionEvent(walEntryHandler), insertNode, resource);
}
private PipeRealtimeCollectEventFactory() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 8d594629bf4..9ccb946ae45 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.resource.wal;
import org.apache.iotdb.db.wal.exception.MemTablePinException;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -35,7 +35,7 @@ public class PipeWALResource implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeWALResource.class);
- private final WALPipeHandler walPipeHandler;
+ private final WALEntryHandler walEntryHandler;
private final AtomicInteger referenceCount;
@@ -44,8 +44,8 @@ public class PipeWALResource implements AutoCloseable {
private final AtomicLong lastLogicalPinTime;
private final AtomicBoolean isPhysicallyPinned;
- public PipeWALResource(WALPipeHandler walPipeHandler) {
- this.walPipeHandler = walPipeHandler;
+ public PipeWALResource(WALEntryHandler walEntryHandler) {
+ this.walEntryHandler = walEntryHandler;
referenceCount = new AtomicInteger(0);
@@ -57,15 +57,15 @@ public class PipeWALResource implements AutoCloseable {
if (referenceCount.get() == 0) {
if (!isPhysicallyPinned.get()) {
try {
- walPipeHandler.pinMemTable();
+ walEntryHandler.pinMemTable();
} catch (MemTablePinException e) {
throw new PipeRuntimeNonCriticalException(
String.format(
"failed to pin wal %d, because %s",
- walPipeHandler.getMemTableId(), e.getMessage()));
+ walEntryHandler.getMemTableId(), e.getMessage()));
}
isPhysicallyPinned.set(true);
- LOGGER.info("wal {} is pinned by pipe engine",
walPipeHandler.getMemTableId());
+ LOGGER.info("wal {} is pinned by pipe engine",
walEntryHandler.getMemTableId());
} // else means the wal is already pinned, do nothing
// no matter the wal is pinned or not, update the last pin time
@@ -84,7 +84,7 @@ public class PipeWALResource implements AutoCloseable {
throw new PipeRuntimeCriticalException(
String.format(
"wal %d is unpinned more than pinned, this should not happen",
- walPipeHandler.getMemTableId()));
+ walEntryHandler.getMemTableId()));
}
referenceCount.decrementAndGet();
@@ -113,17 +113,17 @@ public class PipeWALResource implements AutoCloseable {
if (isPhysicallyPinned.get()) {
if (System.currentTimeMillis() - lastLogicalPinTime.get() >
MIN_TIME_TO_LIVE_IN_MS) {
try {
- walPipeHandler.unpinMemTable();
+ walEntryHandler.unpinMemTable();
} catch (MemTablePinException e) {
throw new PipeRuntimeNonCriticalException(
String.format(
"failed to unpin wal %d, because %s",
- walPipeHandler.getMemTableId(), e.getMessage()));
+ walEntryHandler.getMemTableId(), e.getMessage()));
}
isPhysicallyPinned.set(false);
LOGGER.info(
"wal {} is unpinned by pipe engine when checking time to live",
- walPipeHandler.getMemTableId());
+ walEntryHandler.getMemTableId());
return true;
} else {
return false;
@@ -131,7 +131,7 @@ public class PipeWALResource implements AutoCloseable {
} else {
LOGGER.info(
"wal {} is not pinned physically when checking time to live",
- walPipeHandler.getMemTableId());
+ walEntryHandler.getMemTableId());
return true;
}
}
@@ -140,17 +140,17 @@ public class PipeWALResource implements AutoCloseable {
public void close() {
if (isPhysicallyPinned.get()) {
try {
- walPipeHandler.unpinMemTable();
+ walEntryHandler.unpinMemTable();
} catch (MemTablePinException e) {
LOGGER.error(
"failed to unpin wal {} when closing pipe wal resource, because
{}",
- walPipeHandler.getMemTableId(),
+ walEntryHandler.getMemTableId(),
e.getMessage());
}
isPhysicallyPinned.set(false);
LOGGER.info(
"wal {} is unpinned by pipe engine when closing pipe wal resource",
- walPipeHandler.getMemTableId());
+ walEntryHandler.getMemTableId());
}
referenceCount.set(0);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 18b942496cd..3bdd18c9670 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -2,7 +2,7 @@
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import java.util.HashMap;
import java.util.Map;
@@ -53,13 +53,13 @@ public class PipeWALResourceManager implements
AutoCloseable {
TimeUnit.MILLISECONDS);
}
- public void pin(long memtableId, WALPipeHandler walPipeHandler) {
+ public void pin(long memtableId, WALEntryHandler walEntryHandler) {
final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
lock.lock();
try {
memtableIdToPipeWALResourceMap
- .computeIfAbsent(memtableId, id -> new
PipeWALResource(walPipeHandler))
+ .computeIfAbsent(memtableId, id -> new
PipeWALResource(walEntryHandler))
.pin();
} finally {
lock.unlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index bd4a5dd5f13..9aff438a835 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -256,7 +256,7 @@ public class WALBuffer extends AbstractWALBuffer {
// update related info
totalSize += size;
info.metaData.add(size, searchIndex);
- walEntry.getWalFlushListener().getWalPipeHandler().setSize(size);
+ walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
@@ -495,9 +495,9 @@ public class WALBuffer extends AbstractWALBuffer {
if (forceSuccess) {
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
- if (fsyncListener.getWalPipeHandler() != null) {
-
fsyncListener.getWalPipeHandler().setEntryPosition(walFileVersionId, position);
- position += fsyncListener.getWalPipeHandler().getSize();
+ if (fsyncListener.getWalEntryHandler() != null) {
+
fsyncListener.getWalEntryHandler().setEntryPosition(walFileVersionId, position);
+ position += fsyncListener.getWalEntryHandler().getSize();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 4153c3c49f5..3eacc838cec 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -150,8 +150,8 @@ public class WALNode implements IWALNode {
private WALFlushListener log(WALEntry walEntry) {
buffer.write(walEntry);
// set handler for pipe
-
walEntry.getWalFlushListener().getWalPipeHandler().setMemTableId(walEntry.getMemTableId());
- walEntry.getWalFlushListener().getWalPipeHandler().setWalNode(this);
+
walEntry.getWalFlushListener().getWalEntryHandler().setMemTableId(walEntry.getMemTableId());
+ walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this);
return walEntry.getWalFlushListener();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
similarity index 96%
rename from
server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
rename to
server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
index 3392731f148..e8849dc6536 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryHandler.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
* This handler is used by the Pipe to find the corresponding insert node.
Besides, it can try to
* pin/unpin the wal entries by the memTable id.
*/
-public class WALPipeHandler {
- private static final Logger logger =
LoggerFactory.getLogger(WALPipeHandler.class);
+public class WALEntryHandler {
+ private static final Logger logger =
LoggerFactory.getLogger(WALEntryHandler.class);
private long memTableId = -1;
/** cached value, null after this value is flushed to wal successfully */
@@ -42,7 +42,7 @@ public class WALPipeHandler {
/** wal node, null when wal is disabled */
private WALNode walNode = null;
- public WALPipeHandler(WALEntryValue value) {
+ public WALEntryHandler(WALEntryValue value) {
this.value = value;
}
@@ -136,15 +136,13 @@ public class WALPipeHandler {
@Override
public String toString() {
- return "WALPipeHandler{"
+ return "WALEntryHandler{"
+ "memTableId="
+ memTableId
+ ", value="
+ value
+ ", walEntryPosition="
+ walEntryPosition
- + ", walNode="
- + walNode
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 1b468635899..2fecd1f585c 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.wal.utils.listener;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
/** This class helps judge whether wal is flushed to the storage device. */
public class WALFlushListener extends AbstractResultListener {
// handler for pipeline, only exists then value is InsertNode
- private final WALPipeHandler walPipeHandler;
+ private final WALEntryHandler walEntryHandler;
public WALFlushListener(boolean wait, WALEntryValue value) {
super(wait);
- walPipeHandler = new WALPipeHandler(value);
+ walEntryHandler = new WALEntryHandler(value);
}
- public WALPipeHandler getWalPipeHandler() {
- return walPipeHandler;
+ public WALEntryHandler getWalEntryHandler() {
+ return walEntryHandler;
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index d18fc026678..45b0cf504b2 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -30,7 +30,7 @@ import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -253,7 +253,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
- mock(WALPipeHandler.class),
+ mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
@@ -267,7 +267,7 @@ public class PipeRealtimeCollectTest {
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
- mock(WALPipeHandler.class),
+ mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
diff --git
a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
similarity index 94%
rename from
server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
rename to
server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
index 77c28c6d333..d34ca54cae8 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALEntryHandlerTest.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.db.wal.utils.WALMode;
-import org.apache.iotdb.db.wal.utils.WALPipeHandler;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class WALPipeHandlerTest {
+public class WALEntryHandlerTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final String identifier = String.valueOf(Integer.MAX_VALUE);
private static final String logDirectory =
TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
@@ -85,7 +85,7 @@ public class WALPipeHandlerTest {
memTable.getMemTableId(), getInsertRowNode(devicePath,
System.currentTimeMillis()));
walNode.onMemTableFlushed(memTable);
// pin flushed memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
}
@@ -97,7 +97,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// roll wal file
@@ -128,7 +128,7 @@ public class WALPipeHandlerTest {
memTable.getMemTableId(), getInsertRowNode(devicePath,
System.currentTimeMillis()));
walNode.onMemTableFlushed(memTable);
// pin flushed memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.unpinMemTable();
}
@@ -139,7 +139,7 @@ public class WALPipeHandlerTest {
WALFlushListener flushListener =
walNode.log(
memTable.getMemTableId(), getInsertRowNode(devicePath,
System.currentTimeMillis()));
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
// pin twice
handler.pinMemTable();
handler.pinMemTable();
@@ -164,7 +164,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// roll wal file
@@ -193,7 +193,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
assertEquals(node1, handler.getValue());
@@ -207,7 +207,7 @@ public class WALPipeHandlerTest {
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
// pin memTable
- WALPipeHandler handler = flushListener.getWalPipeHandler();
+ WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode.onMemTableFlushed(memTable);
// wait until wal flushed
diff --git
a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
index 98459619a70..1e5dc245cc9 100644
---
a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
@@ -79,7 +79,7 @@ public class WALInsertNodeCacheTest {
InsertRowNode node1 = getInsertRowNode(devicePath,
System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode.log(memTable.getMemTableId(),
node1);
- WALEntryPosition position =
flushListener.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position =
flushListener.getWalEntryHandler().getWalEntryPosition();
// wait until wal flushed
while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
Thread.sleep(50);
@@ -96,18 +96,18 @@ public class WALInsertNodeCacheTest {
InsertRowNode node1 = getInsertRowNode(devicePath,
System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(),
node1);
- WALEntryPosition position1 =
flushListener1.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position1 =
flushListener1.getWalEntryHandler().getWalEntryPosition();
InsertRowNode node2 = getInsertRowNode(devicePath,
System.currentTimeMillis());
node1.setSearchIndex(2);
WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(),
node2);
- WALEntryPosition position2 =
flushListener2.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position2 =
flushListener2.getWalEntryHandler().getWalEntryPosition();
// write memTable2
IMemTable memTable2 = new PrimitiveMemTable();
walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
InsertRowNode node3 = getInsertRowNode(devicePath,
System.currentTimeMillis());
node1.setSearchIndex(3);
WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(),
node3);
- WALEntryPosition position3 =
flushListener3.getWalPipeHandler().getWalEntryPosition();
+ WALEntryPosition position3 =
flushListener3.getWalEntryHandler().getWalEntryPosition();
// wait until wal flushed
walNode.rollWALFile();
while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {