This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cb0765836fd Pipe: Reported the progress of the non-forwarding events
(#13008)
cb0765836fd is described below
commit cb0765836fd986fca7e09db60724879987d9c22c
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 24 18:42:31 2024 +0800
Pipe: Reported the progress of the non-forwarding events (#13008)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../event/realtime/PipeRealtimeEventFactory.java | 15 ++++++++++----
.../realtime/PipeRealtimeDataRegionExtractor.java | 16 ++++++++++-----
.../PipeRealtimeDataRegionHeartbeatExtractor.java | 3 +++
.../PipeRealtimeDataRegionHybridExtractor.java | 8 +++++---
.../PipeRealtimeDataRegionLogExtractor.java | 8 +++++---
.../PipeRealtimeDataRegionTsFileExtractor.java | 8 +++++---
.../realtime/assigner/PipeDataRegionAssigner.java | 24 +++++++++++++++++-----
7 files changed, 59 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 94793b74bc1..60786cfed25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.realtime;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -34,13 +35,15 @@ public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
public static PipeRealtimeEvent createRealtimeEvent(
- TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+ final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe),
resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
- WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource
resource) {
+ final WALEntryHandler walEntryHandler,
+ final InsertNode insertNode,
+ final TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
@@ -53,16 +56,20 @@ public class PipeRealtimeEventFactory {
}
public static PipeRealtimeEvent createRealtimeEvent(
- String dataRegionId, boolean shouldPrintMessage) {
+ final String dataRegionId, final boolean shouldPrintMessage) {
return new PipeRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null,
null);
}
- public static PipeRealtimeEvent createRealtimeEvent(DeleteDataNode node) {
+ public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode
node) {
return new PipeRealtimeEvent(
new PipeSchemaRegionWritePlanEvent(node, node.isGeneratedByPipe()),
null, null, null);
}
+ public static PipeRealtimeEvent createRealtimeEvent(final
ProgressReportEvent event) {
+ return new PipeRealtimeEvent(event, null, null, null);
+ }
+
private PipeRealtimeEventFactory() {
// factory class, do not instantiate
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 138947af18d..55faf2db59c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -287,6 +288,12 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
* @param event the {@link Event} from the {@link StorageEngine}
*/
public final void extract(final PipeRealtimeEvent event) {
+ // The progress report event shall be directly extracted
+ if (event.getEvent() instanceof ProgressReportEvent) {
+ extractDirectly(event);
+ return;
+ }
+
if (isDbNameCoveredByPattern) {
event.skipParsingPattern();
}
@@ -369,14 +376,13 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
}
}
- protected void extractDeletion(final PipeRealtimeEvent event) {
+ protected void extractDirectly(final PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// Pending is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
- "extract: pending queue of %s %s "
- + "has reached capacity, discard deletion event %s",
+ "extract: pending queue of %s %s " + "has reached capacity,
discard event %s",
this.getClass().getSimpleName(), this, event);
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
@@ -404,7 +410,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
}
}
- protected Event supplyDeletion(final PipeRealtimeEvent event) {
+ protected Event supplyDirectly(final PipeRealtimeEvent event) {
if
(event.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName()))
{
return event.getEvent();
} else {
@@ -413,7 +419,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// event and report the exception to PipeRuntimeAgent.
final String errorMessage =
String.format(
- "TsFile Event %s can not be supplied because "
+ "Event %s can not be supplied because "
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
index 39a64989048..1df62eecc92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -35,6 +36,8 @@ public class PipeRealtimeDataRegionHeartbeatExtractor extends
PipeRealtimeDataRe
// only supply PipeHeartbeatEvent
if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
suppliedEvent = supplyHeartbeat(realtimeEvent);
+ } else if (realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+ suppliedEvent = supplyDirectly(realtimeEvent);
}
realtimeEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 15a020cd577..ced920f378c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -57,7 +58,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
- extractDeletion(event);
+ extractDirectly(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -259,8 +260,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
suppliedEvent = supplyTsFileInsertion(realtimeEvent);
} else if (eventToSupply instanceof PipeHeartbeatEvent) {
suppliedEvent = supplyHeartbeat(realtimeEvent);
- } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent) {
- suppliedEvent = supplyDeletion(realtimeEvent);
+ } else if (eventToSupply instanceof PipeSchemaRegionWritePlanEvent
+ || eventToSupply instanceof ProgressReportEvent) {
+ suppliedEvent = supplyDirectly(realtimeEvent);
} else {
throw new UnsupportedOperationException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
index 40d8616a181..4b300355c80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -49,7 +50,7 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
- extractDeletion(event);
+ extractDirectly(event);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -130,8 +131,9 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
suppliedEvent = supplyHeartbeat(realtimeEvent);
- } else if (realtimeEvent.getEvent() instanceof
PipeSchemaRegionWritePlanEvent) {
- suppliedEvent = supplyDeletion(realtimeEvent);
+ } else if (realtimeEvent.getEvent() instanceof
PipeSchemaRegionWritePlanEvent
+ || realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+ suppliedEvent = supplyDirectly(realtimeEvent);
} else if (realtimeEvent.increaseReferenceCount(
PipeRealtimeDataRegionLogExtractor.class.getName())) {
suppliedEvent = realtimeEvent.getEvent();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 57e00fa51c1..8072499b3da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
@@ -44,7 +45,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends
PipeRealtimeDataRegio
}
if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
- extractDeletion(event);
+ extractDirectly(event);
return;
}
@@ -91,8 +92,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends
PipeRealtimeDataRegio
if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
suppliedEvent = supplyHeartbeat(realtimeEvent);
- } else if (realtimeEvent.getEvent() instanceof
PipeSchemaRegionWritePlanEvent) {
- suppliedEvent = supplyDeletion(realtimeEvent);
+ } else if (realtimeEvent.getEvent() instanceof
PipeSchemaRegionWritePlanEvent
+ || realtimeEvent.getEvent() instanceof ProgressReportEvent) {
+ suppliedEvent = supplyDirectly(realtimeEvent);
} else if (realtimeEvent.increaseReferenceCount(
PipeRealtimeDataRegionTsFileExtractor.class.getName())) {
suppliedEvent = realtimeEvent.getEvent();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 16f9ebf61d9..1042b1924a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
@@ -47,14 +49,14 @@ public class PipeDataRegionAssigner implements Closeable {
return dataRegionId;
}
- public PipeDataRegionAssigner(String dataRegionId) {
+ public PipeDataRegionAssigner(final String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
this.disruptor = new DisruptorQueue(this::assignToExtractor);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
}
- public void publishToAssign(PipeRealtimeEvent event) {
+ public void publishToAssign(final PipeRealtimeEvent event) {
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
disruptor.publish(event);
@@ -64,12 +66,24 @@ public class PipeDataRegionAssigner implements Closeable {
}
}
- public void assignToExtractor(PipeRealtimeEvent event, long sequence,
boolean endOfBatch) {
+ public void assignToExtractor(
+ final PipeRealtimeEvent event, final long sequence, final boolean
endOfBatch) {
matcher
.match(event)
.forEach(
extractor -> {
if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
+ final ProgressReportEvent reportEvent =
+ new ProgressReportEvent(
+ extractor.getPipeName(),
+ extractor.getCreationTime(),
+ extractor.getPipeTaskMeta(),
+ extractor.getPipePattern(),
+ extractor.getRealtimeDataExtractionStartTime(),
+ extractor.getRealtimeDataExtractionEndTime());
+ reportEvent.bindProgressIndex(event.getProgressIndex());
+
reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
return;
}
@@ -98,11 +112,11 @@ public class PipeDataRegionAssigner implements Closeable {
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(),
false);
}
- public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+ public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
matcher.register(extractor);
}
- public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) {
+ public void stopAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
matcher.deregister(extractor);
}