This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-ref-issue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e03654a418ae8bba0782ab1be4a18de393a77d82 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Aug 5 04:55:24 2023 +0800 1. 极端场景久不久丢一些数据的一个原因:Disruptor 的 Producer Type 选错了,我们是多生产者的(log 一个,tsfile 一个,两个会并发,ring buffer 会覆盖)https://stackoverflow.com/questions/18997398/lmax-disruptor-is-too-slow-in-multi-producer-mode-compared-to-single-producer-mo [捂脸] 2. Hybrid Mode 确实会导致 wal 被长期 pin 住,因为有些被 ignore 的 event,在退出条件的 case 下引用计数没有正确归零 [捂脸] --- .../PipeRealtimeDataRegionHybridExtractor.java | 97 +++++++++++++--------- .../PipeRealtimeDataRegionLogExtractor.java | 3 +- .../PipeRealtimeDataRegionTsFileExtractor.java | 3 +- .../realtime/assigner/DisruptorQueue.java | 84 +++++++------------ .../realtime/assigner/PipeDataRegionAssigner.java | 10 +-- .../db/pipe/resource/wal/PipeWALResource.java | 4 + .../pipe/resource/wal/PipeWALResourceManager.java | 12 ++- 7 files changed, 111 insertions(+), 102 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 026a3334d54..9b8ff792be4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -77,31 +77,37 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio // all the data represented by the tablet events should be carried by the following tsfile // event. event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - LOGGER.info( - "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor " - + "is approaching capacity, discard tablet event {}, change state of tsfile epoch to {}", - event, - event.getTsFileEpoch().getState(this)); - - // Ignore the tablet event. - event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); - return; } - if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE) - && !pendingQueue.waitedOffer(event)) { - // this would not happen, but just in case. - // pendingQueue is unbounded, so it should never reach capacity. - final String errorMessage = - String.format( - "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " - + "has reached capacity, discard tablet event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this)); - LOGGER.error(errorMessage); - PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - - // Ignore the tablet event. - event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); + switch (state) { + case USING_TSFILE: + // Ignore the tablet event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + break; + case EMPTY: + case USING_TABLET: + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard tablet event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime() + .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tablet event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + } + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported state %s for hybrid realtime extractor %s", + state, PipeRealtimeDataRegionHybridExtractor.class.getName())); } } @@ -113,19 +119,35 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio state -> state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state); - if (!pendingQueue.waitedOffer(event)) { - // this would not happen, but just in case. - // pendingQueue is unbounded, so it should never reach capacity. - final String errorMessage = - String.format( - "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " - + "has reached capacity, discard TsFile event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this)); - LOGGER.error(errorMessage); - PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - - // Ignore the tsfile event. - event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); + switch (state) { + case EMPTY: + case USING_TSFILE: + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard TsFile event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime() + .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tsfile event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + } + break; + case USING_TABLET: + // All the tablet events have been extracted, so we can ignore the tsfile event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported state %s for hybrid realtime extractor %s", + state, PipeRealtimeDataRegionHybridExtractor.class.getName())); } } @@ -154,8 +176,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio eventToSupply.getClass(), this)); } + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); + if (suppliedEvent != null) { - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index 17891add76d..156c8891710 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -102,8 +102,9 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); + if (suppliedEvent != null) { - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index 5e97a338df1..cae2e6f3c1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -102,8 +102,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); + if (suppliedEvent != null) { - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java index 9c6d97901aa..bb5ecf89b6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java @@ -20,80 +20,56 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.util.DaemonThreadFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadFactory; - -public class DisruptorQueue<E> { - - private Disruptor<Container<E>> disruptor; - private RingBuffer<Container<E>> ringBuffer; - - private DisruptorQueue() {} +public class DisruptorQueue { + + private final Disruptor<EventContainer> disruptor; + private final RingBuffer<EventContainer> ringBuffer; + + public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) { + disruptor = + new Disruptor<>( + EventContainer::new, + PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(), + DaemonThreadFactory.INSTANCE, // TODO + ProducerType.MULTI, + new BlockingWaitStrategy()); + disruptor.handleEventsWith( + (container, sequence, endOfBatch) -> + eventHandler.onEvent(container.getEvent(), sequence, endOfBatch)); + disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler()); + + ringBuffer = disruptor.start(); + } - public void publish(E obj) { - ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), obj); + public void publish(PipeRealtimeEvent event) { + ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); } public void clear() { disruptor.halt(); } - public static class Builder<E> { - private int ringBufferSize = - PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(); - private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; - private ProducerType producerType = ProducerType.MULTI; - private WaitStrategy waitStrategy = new BlockingWaitStrategy(); - private final List<EventHandler<E>> handlers = new ArrayList<>(); - - public Builder<E> setProducerType(ProducerType producerType) { - this.producerType = producerType; - return this; - } - - public Builder<E> addEventHandler(EventHandler<E> eventHandler) { - this.handlers.add(eventHandler); - return this; - } - - public DisruptorQueue<E> build() { - DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>(); - disruptorQueue.disruptor = - new Disruptor<>( - Container::new, ringBufferSize, threadFactory, producerType, waitStrategy); - for (EventHandler<E> handler : handlers) { - disruptorQueue.disruptor.handleEventsWith( - (container, sequence, endOfBatch) -> - handler.onEvent(container.getObj(), sequence, endOfBatch)); - } - disruptorQueue.disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler()); - disruptorQueue.disruptor.start(); - disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer(); - return disruptorQueue; - } - } + private static class EventContainer { - private static class Container<E> { - private E obj; + private PipeRealtimeEvent event; - private Container() {} + private EventContainer() {} - public E getObj() { - return obj; + public PipeRealtimeEvent getEvent() { + return event; } - public void setObj(E obj) { - this.obj = obj; + public void setEvent(PipeRealtimeEvent event) { + this.event = event; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java index d5ca1a10b90..ee569e9e4ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java @@ -24,23 +24,17 @@ import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtract import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher; -import com.lmax.disruptor.dsl.ProducerType; - public class PipeDataRegionAssigner { /** The matcher is used to match the event with the extractor based on the pattern. */ private final PipeDataRegionMatcher matcher; /** The disruptor is used to assign the event to the extractor. */ - private final DisruptorQueue<PipeRealtimeEvent> disruptor; + private final DisruptorQueue disruptor; public PipeDataRegionAssigner() { this.matcher = new CachedSchemaPatternMatcher(); - this.disruptor = - new DisruptorQueue.Builder<PipeRealtimeEvent>() - .setProducerType(ProducerType.SINGLE) - .addEventHandler(this::assignToExtractor) - .build(); + this.disruptor = new DisruptorQueue(this::assignToExtractor); } public void publishToAssign(PipeRealtimeEvent event) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java index 085c71d5f9c..358e319dafa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java @@ -162,4 +162,8 @@ public abstract class PipeWALResource implements Closeable { referenceCount.set(0); } + + public int getReferenceCount() { + return referenceCount.get(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java index 64cd83f1a67..afbe6931533 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -24,6 +24,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class PipeWALResourceManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResourceManager.class); + protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap; private static final int SEGMENT_LOCK_COUNT = 32; @@ -55,7 +60,7 @@ public abstract class PipeWALResourceManager { ScheduledExecutorUtil.safelyScheduleWithFixedDelay( PIPE_WAL_RESOURCE_TTL_CHECKER, () -> { - Iterator<Map.Entry<Long, PipeWALResource>> iterator = + final Iterator<Map.Entry<Long, PipeWALResource>> iterator = memtableIdToPipeWALResourceMap.entrySet().iterator(); while (iterator.hasNext()) { final Map.Entry<Long, PipeWALResource> entry = iterator.next(); @@ -66,6 +71,11 @@ public abstract class PipeWALResourceManager { try { if (entry.getValue().invalidateIfPossible()) { iterator.remove(); + } else { + LOGGER.info( + "WAL (memtableId {}) is still referenced {} times", + entry.getKey(), + entry.getValue().getReferenceCount()); } } finally { lock.unlock();
