This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f8c782ec037 Pipe/Subscription: use heartbeat only extractor if enable
snapshot mode (#12904)
f8c782ec037 is described below
commit f8c782ec0373c973f0926ce3eb5266e8615a37cc
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jul 12 19:21:29 2024 +0800
Pipe/Subscription: use heartbeat only extractor if enable snapshot mode
(#12904)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../it/dual/IoTDBSubscriptionTopicIT.java | 15 +++++-
.../dataregion/IoTDBDataRegionExtractor.java | 27 ++++++++--
.../PipeHistoricalDataRegionTsFileExtractor.java | 6 +--
... PipeRealtimeDataRegionHeartbeatExtractor.java} | 58 ++++++++++++----------
4 files changed, 72 insertions(+), 34 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 04a1f33772f..ce8b46f2aea 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -552,7 +552,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
private void testTopicWithSnapshotModeTemplate(final String topicFormat)
throws Exception {
- // Insert some historical data
+ // Insert some historical data before subscription
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
@@ -596,6 +596,19 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);
+
+ // Insert some data after subscription
+ try (final ISession session =
senderEnv.getSessionConnection()) {
+ for (int i = 100; i < 200; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values
(%s, 1)", i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
while (!isClosed.get()) {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
final List<SubscriptionMessage> messages =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index d89b223df5b..0b447ffe7a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -30,7 +30,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionFakeExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
@@ -59,6 +59,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
@@ -78,6 +82,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
@@ -236,17 +241,31 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
private void constructRealtimeExtractor(final PipeParameters parameters)
throws IllegalPathException {
- // Enable realtime extractor by default
+ // Use heartbeat only extractor if disable realtime extractor
if (!parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
- realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
+ realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
LOGGER.info(
- "Pipe: '{}' is set to false, use fake realtime extractor.",
+ "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
EXTRACTOR_REALTIME_ENABLE_KEY);
return;
}
+ // Use heartbeat only extractor if enable snapshot mode
+ final String extractorModeValue =
+ parameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
+ if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+ || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE))
{
+ realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+ LOGGER.info(
+ "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
+ EXTRACTOR_MODE_KEY,
+ EXTRACTOR_MODE_SNAPSHOT_VALUE);
+ return;
+ }
+
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
checkWalEnable(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index f89d85f0d97..bbf34892e42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -616,10 +616,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return event;
}
+ @Override
public synchronized boolean hasConsumedAll() {
- // If the pendingQueue is null when the function is called, it
- // implies that the extractor only extracts deletion thus the
- // Historical event has nothing to consume
+ // If the pendingQueue is null when the function is called, it implies
that the extractor only
+ // extracts deletion thus the historical event has nothing to consume.
return Objects.isNull(pendingQueue)
|| pendingQueue.isEmpty()
&& (!shouldTerminatePipeOnAllHistoricalEventsConsumed ||
isTerminateSignalSent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
similarity index 51%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
index 5c639e7a4d0..39a64989048 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
@@ -19,38 +19,45 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-public class PipeRealtimeDataRegionFakeExtractor extends
PipeRealtimeDataRegionExtractor {
+public class PipeRealtimeDataRegionHeartbeatExtractor extends
PipeRealtimeDataRegionExtractor {
@Override
- public void validate(PipeParameterValidator validator) {
- // do nothing
- }
+ public Event supply() {
+ PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
- @Override
- public void customize(
- PipeParameters parameters, PipeExtractorRuntimeConfiguration
configuration) {
- // do nothing
- }
+ while (realtimeEvent != null) {
+ Event suppliedEvent = null;
- @Override
- public void start() {
- // do nothing
- }
+ // only supply PipeHeartbeatEvent
+ if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
+ suppliedEvent = supplyHeartbeat(realtimeEvent);
+ }
+
+ realtimeEvent.decreaseReferenceCount(
+ PipeRealtimeDataRegionHeartbeatExtractor.class.getName(), false);
+
+ if (suppliedEvent != null) {
+ return suppliedEvent;
+ }
+
+ realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
+ }
- @Override
- public Event supply() {
return null;
}
@Override
- protected void doExtract(PipeRealtimeEvent event) {
- // do nothing
+ protected void doExtract(final PipeRealtimeEvent event) {
+ // only extract PipeHeartbeatEvent
+ if (event.getEvent() instanceof PipeHeartbeatEvent) {
+ extractHeartbeat(event);
+ } else {
+
event.decreaseReferenceCount(PipeRealtimeDataRegionHeartbeatExtractor.class.getName(),
false);
+ }
}
@Override
@@ -63,13 +70,12 @@ public class PipeRealtimeDataRegionFakeExtractor extends
PipeRealtimeDataRegionE
return false;
}
- @Override
- public void close() {
- // do nothing
- }
-
@Override
public String toString() {
- return "PipeRealtimeDataRegionFakeExtractor{}";
+ return "PipeRealtimeDataRegionHeartbeatExtractor{"
+ + "dataRegionId='"
+ + dataRegionId
+ + '\''
+ + '}';
}
}