This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d78326a01c5f9a6eab7c825e64ac4954ec9865b7 Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 24 18:42:31 2024 +0800 Pipe: Reported the progress of the non-forwarding events (#13008) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit cb0765836fd986fca7e09db60724879987d9c22c) --- .../event/realtime/PipeRealtimeEventFactory.java | 15 ++++++++++---- .../realtime/PipeRealtimeDataRegionExtractor.java | 16 ++++++++++----- .../PipeRealtimeDataRegionHeartbeatExtractor.java | 3 +++ .../PipeRealtimeDataRegionHybridExtractor.java | 8 +++++--- .../PipeRealtimeDataRegionLogExtractor.java | 8 +++++--- .../PipeRealtimeDataRegionTsFileExtractor.java | 8 +++++--- .../realtime/assigner/PipeDataRegionAssigner.java | 24 +++++++++++++++++----- 7 files changed, 59 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 94793b74bc1..60786cfed25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.event.realtime; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -34,13 +35,15 @@ public class PipeRealtimeEventFactory { private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager(); public static PipeRealtimeEvent createRealtimeEvent( - TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) { + final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe) { return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent( new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe), resource); } public static PipeRealtimeEvent createRealtimeEvent( - WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) { + final WALEntryHandler walEntryHandler, + final InsertNode insertNode, + final TsFileResource resource) { return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( new PipeInsertNodeTabletInsertionEvent( walEntryHandler, @@ -53,16 +56,20 @@ public class PipeRealtimeEventFactory { } public static PipeRealtimeEvent createRealtimeEvent( - String dataRegionId, boolean shouldPrintMessage) { + final String dataRegionId, final boolean shouldPrintMessage) { return new PipeRealtimeEvent( new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null); } - public static PipeRealtimeEvent createRealtimeEvent(DeleteDataNode node) { + public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode node) { return new PipeRealtimeEvent( new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()), null, null, null); } + public static PipeRealtimeEvent createRealtimeEvent(final ProgressReportEvent event) { + return new PipeRealtimeEvent(event, null, null, null); + } + private PipeRealtimeEventFactory() { // factory class, do not instantiate } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index 138947af18d..55faf2db59c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; @@ -287,6 +288,12 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { * @param event the {@link Event} from the {@link StorageEngine} */ public final void extract(final PipeRealtimeEvent event) { + // The progress report event shall be directly extracted + if (event.getEvent() instanceof ProgressReportEvent) { + extractDirectly(event); + return; + } + if (isDbNameCoveredByPattern) { event.skipParsingPattern(); } @@ -369,14 +376,13 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { } } - protected void extractDeletion(final PipeRealtimeEvent event) { + protected void extractDirectly(final PipeRealtimeEvent event) { if (!pendingQueue.waitedOffer(event)) { // This would not happen, but just in case. // Pending is unbounded, so it should never reach capacity. final String errorMessage = String.format( - "extract: pending queue of %s %s " - + "has reached capacity, discard deletion event %s", + "extract: pending queue of %s %s " + "has reached capacity, discard event %s", this.getClass().getSimpleName(), this, event); LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() @@ -404,7 +410,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { } } - protected Event supplyDeletion(final PipeRealtimeEvent event) { + protected Event supplyDirectly(final PipeRealtimeEvent event) { if (event.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName())) { return event.getEvent(); } else { @@ -413,7 +419,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { // event and report the exception to PipeRuntimeAgent. final String errorMessage = String.format( - "TsFile Event %s can not be supplied because " + "Event %s can not be supplied because " + "the reference count can not be increased, " + "the data represented by this event is lost", event.getEvent()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java index 39a64989048..1df62eecc92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -35,6 +36,8 @@ public class PipeRealtimeDataRegionHeartbeatExtractor extends PipeRealtimeDataRe // only supply PipeHeartbeatEvent if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) { suppliedEvent = supplyHeartbeat(realtimeEvent); + } else if (realtimeEvent.getEvent() instanceof ProgressReportEvent) { + suppliedEvent = supplyDirectly(realtimeEvent); } realtimeEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 15a020cd577..ced920f378c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -57,7 +58,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } else if (eventToExtract instanceof PipeHeartbeatEvent) { extractHeartbeat(event); } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) { - extractDeletion(event); + extractDirectly(event); } else { throw new UnsupportedOperationException( String.format( @@ -259,8 +260,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio suppliedEvent = supplyTsFileInsertion(realtimeEvent); } else if (eventToSupply instanceof PipeHeartbeatEvent) { suppliedEvent = supplyHeartbeat(realtimeEvent); - } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent) { - suppliedEvent = supplyDeletion(realtimeEvent); + } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent + || eventToSupply instanceof ProgressReportEvent) { + suppliedEvent = supplyDirectly(realtimeEvent); } else { throw new UnsupportedOperationException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java index 40d8616a181..4b300355c80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; @@ -49,7 +50,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx } else if (eventToExtract instanceof PipeHeartbeatEvent) { extractHeartbeat(event); } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) { - extractDeletion(event); + extractDirectly(event); } else { throw new UnsupportedOperationException( String.format( @@ -130,8 +131,9 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) { suppliedEvent = supplyHeartbeat(realtimeEvent); - } else if (realtimeEvent.getEvent() instanceof PipeSchemaRegionWritePlanEvent) { - suppliedEvent = supplyDeletion(realtimeEvent); + } else if (realtimeEvent.getEvent() instanceof PipeSchemaRegionWritePlanEvent + || realtimeEvent.getEvent() instanceof ProgressReportEvent) { + suppliedEvent = supplyDirectly(realtimeEvent); } else if (realtimeEvent.increaseReferenceCount( PipeRealtimeDataRegionLogExtractor.class.getName())) { suppliedEvent = realtimeEvent.getEvent(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java index 57e00fa51c1..8072499b3da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; @@ -44,7 +45,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio } if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) { - extractDeletion(event); + extractDirectly(event); return; } @@ -91,8 +92,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) { suppliedEvent = supplyHeartbeat(realtimeEvent); - } else if (realtimeEvent.getEvent() instanceof PipeSchemaRegionWritePlanEvent) { - suppliedEvent = supplyDeletion(realtimeEvent); + } else if (realtimeEvent.getEvent() instanceof PipeSchemaRegionWritePlanEvent + || realtimeEvent.getEvent() instanceof ProgressReportEvent) { + suppliedEvent = supplyDirectly(realtimeEvent); } else if (realtimeEvent.increaseReferenceCount( PipeRealtimeDataRegionTsFileExtractor.class.getName())) { suppliedEvent = realtimeEvent.getEvent(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 16f9ebf61d9..1042b1924a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -20,9 +20,11 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor; import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics; import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher; @@ -47,14 +49,14 @@ public class PipeDataRegionAssigner implements Closeable { return dataRegionId; } - public PipeDataRegionAssigner(String dataRegionId) { + public PipeDataRegionAssigner(final String dataRegionId) { this.matcher = new CachedSchemaPatternMatcher(); this.disruptor = new DisruptorQueue(this::assignToExtractor); this.dataRegionId = dataRegionId; PipeAssignerMetrics.getInstance().register(this); } - public void publishToAssign(PipeRealtimeEvent event) { + public void publishToAssign(final PipeRealtimeEvent event) { event.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); disruptor.publish(event); @@ -64,12 +66,24 @@ public class PipeDataRegionAssigner implements Closeable { } } - public void assignToExtractor(PipeRealtimeEvent event, long sequence, boolean endOfBatch) { + public void assignToExtractor( + final PipeRealtimeEvent event, final long sequence, final boolean endOfBatch) { matcher .match(event) .forEach( extractor -> { if (event.getEvent().isGeneratedByPipe() && !extractor.isForwardingPipeRequests()) { + final ProgressReportEvent reportEvent = + new ProgressReportEvent( + extractor.getPipeName(), + extractor.getCreationTime(), + extractor.getPipeTaskMeta(), + extractor.getPipePattern(), + extractor.getRealtimeDataExtractionStartTime(), + extractor.getRealtimeDataExtractionEndTime()); + reportEvent.bindProgressIndex(event.getProgressIndex()); + reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName()); + extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); return; } @@ -98,11 +112,11 @@ public class PipeDataRegionAssigner implements Closeable { event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), false); } - public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) { + public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) { matcher.register(extractor); } - public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) { + public void stopAssignTo(final PipeRealtimeDataRegionExtractor extractor) { matcher.deregister(extractor); }
