This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b89bf1f0166 Pipe: fix the reference count leak of events published in
the assigner during restart (#14308) (#14342)
b89bf1f0166 is described below
commit b89bf1f0166539369b5b5a7f229f7ac6cb6dc92b
Author: VGalaxies <[email protected]>
AuthorDate: Fri Dec 6 14:53:35 2024 +0800
Pipe: fix the reference count leak of events published in the assigner
during restart (#14308) (#14342)
---
.../realtime/assigner/DisruptorQueue.java | 45 ++++++++---------
.../realtime/assigner/PipeDataRegionAssigner.java | 59 ++++++++++++++++------
2 files changed, 64 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
index 0a576142a39..6ea51fbe4b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
@@ -22,10 +22,8 @@ package
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -35,6 +33,8 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import java.util.function.Consumer;
+
import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
public class DisruptorQueue {
@@ -46,9 +46,11 @@ public class DisruptorQueue {
private final Disruptor<EventContainer> disruptor;
private final RingBuffer<EventContainer> ringBuffer;
- private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
+ private volatile boolean isClosed = false;
- public DisruptorQueue(final EventHandler<PipeRealtimeEvent> eventHandler) {
+ public DisruptorQueue(
+ final EventHandler<PipeRealtimeEvent> eventHandler,
+ final Consumer<PipeRealtimeEvent> onAssignedHook) {
final PipeConfig config = PipeConfig.getInstance();
final int ringBufferSize =
config.getPipeExtractorAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
@@ -71,9 +73,9 @@ public class DisruptorQueue {
new BlockingWaitStrategy());
disruptor.handleEventsWith(
(container, sequence, endOfBatch) -> {
- eventHandler.onEvent(container.getEvent(), sequence, endOfBatch);
- EnrichedEvent innerEvent = container.getEvent().getEvent();
- eventCounter.decreaseEventCount(innerEvent);
+ final PipeRealtimeEvent realtimeEvent = container.getEvent();
+ eventHandler.onEvent(realtimeEvent, sequence, endOfBatch);
+ onAssignedHook.accept(realtimeEvent);
});
disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
@@ -81,19 +83,24 @@ public class DisruptorQueue {
}
public void publish(final PipeRealtimeEvent event) {
- final EnrichedEvent internalEvent = event.getEvent();
- if (internalEvent instanceof PipeHeartbeatEvent) {
- ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
+ final EnrichedEvent innerEvent = event.getEvent();
+ if (innerEvent instanceof PipeHeartbeatEvent) {
+ ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
- eventCounter.increaseEventCount(internalEvent);
}
- public void clear() {
- disruptor.halt();
+ public void shutdown() {
+ isClosed = true;
+ // use shutdown instead of halt to ensure all published events have been
handled
+ disruptor.shutdown();
allocatedMemoryBlock.close();
}
+ public boolean isClosed() {
+ return isClosed;
+ }
+
private static class EventContainer {
private PipeRealtimeEvent event;
@@ -108,16 +115,4 @@ public class DisruptorQueue {
this.event = event;
}
}
-
- public int getTabletInsertionEventCount() {
- return eventCounter.getTabletInsertionEventCount();
- }
-
- public int getTsFileInsertionEventCount() {
- return eventCounter.getTsFileInsertionEventCount();
- }
-
- public int getPipeHeartbeatEventCount() {
- return eventCounter.getPipeHeartbeatEventCount();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d3a7f44e458..c5f08634df4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
@@ -32,6 +33,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
@@ -63,13 +65,15 @@ public class PipeDataRegionAssigner implements Closeable {
private final AtomicReference<ProgressIndex>
maxProgressIndexForTsFileInsertionEvent =
new AtomicReference<>(MinimumProgressIndex.INSTANCE);
+ private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
+
public String getDataRegionId() {
return dataRegionId;
}
public PipeDataRegionAssigner(final String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor = new DisruptorQueue(this::assignToExtractor);
+ this.disruptor = new DisruptorQueue(this::assignToExtractor,
this::onAssignedHook);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
}
@@ -81,19 +85,44 @@ public class PipeDataRegionAssigner implements Closeable {
return;
}
- disruptor.publish(event);
+ final EnrichedEvent innerEvent = event.getEvent();
+ eventCounter.increaseEventCount(innerEvent);
+ if (innerEvent instanceof PipeHeartbeatEvent) {
+ ((PipeHeartbeatEvent) innerEvent).onPublished();
+ }
+
+ if (!disruptor.isClosed()) {
+ disruptor.publish(event);
+ } else {
+ onAssignedHook(event);
+ }
+ }
+
+ private void onAssignedHook(final PipeRealtimeEvent realtimeEvent) {
+ realtimeEvent.gcSchemaInfo();
+
realtimeEvent.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(),
false);
- if (event.getEvent() instanceof PipeHeartbeatEvent) {
- ((PipeHeartbeatEvent) event.getEvent()).onPublished();
+ final EnrichedEvent innerEvent = realtimeEvent.getEvent();
+ eventCounter.decreaseEventCount(innerEvent);
+ if (innerEvent instanceof PipeHeartbeatEvent) {
+ ((PipeHeartbeatEvent) innerEvent).onAssigned();
}
}
- public void assignToExtractor(
+ private void assignToExtractor(
final PipeRealtimeEvent event, final long sequence, final boolean
endOfBatch) {
+ if (disruptor.isClosed()) {
+ return;
+ }
+
matcher
.match(event)
.forEach(
extractor -> {
+ if (disruptor.isClosed()) {
+ return;
+ }
+
if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
// The frequency of progress reports is limited by the
counter, while progress
// reports to TsFileInsertionEvent are not limited.
@@ -148,13 +177,7 @@ public class PipeDataRegionAssigner implements Closeable {
return;
}
extractor.extract(copiedEvent);
-
- if (innerEvent instanceof PipeHeartbeatEvent) {
- ((PipeHeartbeatEvent) innerEvent).onAssigned();
- }
});
- event.gcSchemaInfo();
- event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(),
false);
}
private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
@@ -196,18 +219,24 @@ public class PipeDataRegionAssigner implements Closeable {
public void close() {
PipeAssignerMetrics.getInstance().deregister(dataRegionId);
matcher.clear();
- disruptor.clear();
+
+ final long startTime = System.currentTimeMillis();
+ disruptor.shutdown();
+ LOGGER.info(
+ "Pipe: Assigner on data region {} shutdown internal disruptor within
{} ms",
+ dataRegionId,
+ System.currentTimeMillis() - startTime);
}
public int getTabletInsertionEventCount() {
- return disruptor.getTabletInsertionEventCount();
+ return eventCounter.getTabletInsertionEventCount();
}
public int getTsFileInsertionEventCount() {
- return disruptor.getTsFileInsertionEventCount();
+ return eventCounter.getTsFileInsertionEventCount();
}
public int getPipeHeartbeatEventCount() {
- return disruptor.getPipeHeartbeatEventCount();
+ return eventCounter.getPipeHeartbeatEventCount();
}
}