This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-wal-resource-management in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 77ce31eb61df69bd4c5dadb260df4c6aca16aaea Author: Steve Yurong Su <[email protected]> AuthorDate: Thu May 25 19:57:07 2023 +0800 bind WALPipeHandle --- .../db/engine/storagegroup/TsFileProcessor.java | 10 +++- .../listener/PipeInsertionDataNodeListener.java | 10 ++-- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 3 +- .../core/event/impl/PipeTabletInsertionEvent.java | 53 +++++++++++++++------- .../realtime/PipeRealtimeCollectEventFactory.java | 5 +- .../db/pipe/resource/wal/PipeWALResource.java | 4 ++ .../pipe/resource/wal/PipeWALResourceManager.java | 11 +++++ .../apache/iotdb/db/wal/utils/WALPipeHandler.java | 14 ++++++ .../core/collector/PipeRealtimeCollectTest.java | 6 +++ 9 files changed, 91 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index a0a36e3a1c0..6eceea951d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -277,7 +277,10 @@ public class TsFileProcessor { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource); + dataRegionInfo.getDataRegion().getDataRegionId(), + walFlushListener.getWalPipeHandler(), + insertRowNode, + tsFileResource); if (insertRowNode.isAligned()) { workMemTable.insertAlignedRow(insertRowNode); @@ -377,7 +380,10 @@ public class TsFileProcessor { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource); + dataRegionInfo.getDataRegion().getDataRegionId(), + walFlushListener.getWalPipeHandler(), + insertTabletNode, + tsFileResource); try { if (insertTabletNode.isAligned()) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java index 778a583f1c7..f2298310544 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector; import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner; import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -84,9 +85,11 @@ public class PipeInsertionDataNodeListener { assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource)); } - // TODO: check whether the method is called on the right place. public void listenToInsertNode( - String dataRegionId, InsertNode insertNode, TsFileResource tsFileResource) { + String dataRegionId, + WALPipeHandler walPipeHandler, + InsertNode insertNode, + TsFileResource tsFileResource) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); // only events from registered data region will be collected @@ -95,7 +98,8 @@ public class PipeInsertionDataNodeListener { } assigner.publishToAssign( - PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, tsFileResource)); + PipeRealtimeCollectEventFactory.createCollectEvent( + walPipeHandler, insertNode, tsFileResource)); } /////////////////////////////// singleton /////////////////////////////// diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java index 6b414c03b36..65837f31d42 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq; import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.PipeParameters; @@ -130,7 +131,7 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { } private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent) - throws PipeException, TException { + throws PipeException, TException, WALPipeException { final TPipeTransferResp resp = client.pipeTransfer( PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode())); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java index 62c599b66c1..476f9e6f12a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java @@ -21,28 +21,32 @@ package org.apache.iotdb.db.pipe.core.event.impl; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.resource.PipeResourceManager; +import org.apache.iotdb.db.wal.exception.WALPipeException; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent { - private final InsertNode insertNode; + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletInsertionEvent.class); - private final AtomicInteger referenceCount; + private final WALPipeHandler walPipeHandler; - public PipeTabletInsertionEvent(InsertNode insertNode) { - this.insertNode = insertNode; - this.referenceCount = new AtomicInteger(0); + public PipeTabletInsertionEvent(WALPipeHandler walPipeHandler) { + this.walPipeHandler = walPipeHandler; } - public InsertNode getInsertNode() { - return insertNode; + public InsertNode getInsertNode() throws WALPipeException { + return walPipeHandler.getValue(); } @Override @@ -62,26 +66,41 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE @Override public boolean increaseReferenceCount(String holderMessage) { - // TODO: use WALPipeHandler pinMemtable - referenceCount.incrementAndGet(); - return true; + try { + PipeResourceManager.wal().pin(walPipeHandler.getMemTableId(), walPipeHandler); + return true; + } catch (Exception e) { + LOGGER.warn( + String.format( + "Increase reference count for memtable %d error. Holder Message: %s", + walPipeHandler.getMemTableId(), holderMessage), + e); + return false; + } } @Override public boolean decreaseReferenceCount(String holderMessage) { - // TODO: use WALPipeHandler unpinMemetable - referenceCount.decrementAndGet(); - return true; + try { + PipeResourceManager.wal().unpin(walPipeHandler.getMemTableId()); + return true; + } catch (Exception e) { + LOGGER.warn( + String.format( + "Decrease reference count for memtable %d error. Holder Message: %s", + walPipeHandler.getMemTableId(), holderMessage), + e); + return false; + } } @Override public int getReferenceCount() { - // TODO: use WALPipeHandler unpinMemetable - return referenceCount.get(); + return PipeResourceManager.wal().getReferenceCount(walPipeHandler.getMemTableId()); } @Override public String toString() { - return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}'; + return "PipeTabletInsertionEvent{" + "walPipeHandler=" + walPipeHandler + '}'; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java index 4c98c5193be..a0961624dbe 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; public class PipeRealtimeCollectEventFactory { @@ -34,9 +35,9 @@ public class PipeRealtimeCollectEventFactory { } public static PipeRealtimeCollectEvent createCollectEvent( - InsertNode node, TsFileResource resource) { + WALPipeHandler walPipeHandler, InsertNode insertNode, TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent( - new PipeTabletInsertionEvent(node), node, resource); + new PipeTabletInsertionEvent(walPipeHandler), insertNode, resource); } private PipeRealtimeCollectEventFactory() { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java index 844420272bd..8d594629bf4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java @@ -155,4 +155,8 @@ public class PipeWALResource implements AutoCloseable { referenceCount.set(0); } + + public int getReferenceCount() { + return referenceCount.get(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java index c187a29f781..18b942496cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -95,4 +95,15 @@ public class PipeWALResourceManager implements AutoCloseable { } } } + + public int getReferenceCount(long memtableId) { + final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + return memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount(); + } finally { + lock.unlock(); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java index 80333cf8ffb..3392731f148 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java @@ -133,4 +133,18 @@ public class WALPipeHandler { public void setSize(int size) { this.walEntryPosition.setSize(size); } + + @Override + public String toString() { + return "WALPipeHandler{" + + "memTableId=" + + memTableId + + ", value=" + + value + + ", walEntryPosition=" + + walEntryPosition + + ", walNode=" + + walNode + + '}'; + } } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java index 7cd705af588..d18fc026678 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCo import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector; import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; import org.apache.iotdb.pipe.api.customizer.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -57,6 +58,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import static org.mockito.Mockito.mock; + public class PipeRealtimeCollectTest { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class); @@ -229,6 +232,7 @@ public class PipeRealtimeCollectTest { } private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) { + File dataRegionDir = new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0"); boolean ignored = dataRegionDir.mkdirs(); @@ -249,6 +253,7 @@ public class PipeRealtimeCollectTest { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, + mock(WALPipeHandler.class), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device), @@ -262,6 +267,7 @@ public class PipeRealtimeCollectTest { PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, + mock(WALPipeHandler.class), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device),
