This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f8c782ec037 Pipe/Subscription: use heartbeat only extractor if enable 
snapshot mode (#12904)
f8c782ec037 is described below

commit f8c782ec0373c973f0926ce3eb5266e8615a37cc
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jul 12 19:21:29 2024 +0800

    Pipe/Subscription: use heartbeat only extractor if enable snapshot mode 
(#12904)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 15 +++++-
 .../dataregion/IoTDBDataRegionExtractor.java       | 27 ++++++++--
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  6 +--
 ... PipeRealtimeDataRegionHeartbeatExtractor.java} | 58 ++++++++++++----------
 4 files changed, 72 insertions(+), 34 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 04a1f33772f..ce8b46f2aea 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -552,7 +552,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   private void testTopicWithSnapshotModeTemplate(final String topicFormat) 
throws Exception {
-    // Insert some historical data
+    // Insert some historical data before subscription
     try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
         session.executeNonQueryStatement(
@@ -596,6 +596,19 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                       .buildPullConsumer()) {
                 consumer.open();
                 consumer.subscribe(topicName);
+
+                // Insert some data after subscription
+                try (final ISession session = 
senderEnv.getSessionConnection()) {
+                  for (int i = 100; i < 200; ++i) {
+                    session.executeNonQueryStatement(
+                        String.format("insert into root.db.d1(time, s1) values 
(%s, 1)", i));
+                  }
+                  session.executeNonQueryStatement("flush");
+                } catch (final Exception e) {
+                  e.printStackTrace();
+                  fail(e.getMessage());
+                }
+
                 while (!isClosed.get()) {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   final List<SubscriptionMessage> messages =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index d89b223df5b..0b447ffe7a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionFakeExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHeartbeatExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
@@ -59,6 +59,10 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
@@ -78,6 +82,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
@@ -236,17 +241,31 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
   private void constructRealtimeExtractor(final PipeParameters parameters)
       throws IllegalPathException {
-    // Enable realtime extractor by default
+    // Use heartbeat only extractor if disable realtime extractor
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
-      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
+      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
       LOGGER.info(
-          "Pipe: '{}' is set to false, use fake realtime extractor.",
+          "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
           EXTRACTOR_REALTIME_ENABLE_KEY);
       return;
     }
 
+    // Use heartbeat only extractor if enable snapshot mode
+    final String extractorModeValue =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+    if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
+        || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)) 
{
+      realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor();
+      LOGGER.info(
+          "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
+          EXTRACTOR_MODE_KEY,
+          EXTRACTOR_MODE_SNAPSHOT_VALUE);
+      return;
+    }
+
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       checkWalEnable(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index f89d85f0d97..bbf34892e42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -616,10 +616,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return event;
   }
 
+  @Override
   public synchronized boolean hasConsumedAll() {
-    // If the pendingQueue is null when the function is called, it
-    // implies that the extractor only extracts deletion thus the
-    // Historical event has nothing to consume
+    // If the pendingQueue is null when the function is called, it implies 
that the extractor only
+    // extracts deletion thus the historical event has nothing to consume.
     return Objects.isNull(pendingQueue)
         || pendingQueue.isEmpty()
             && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || 
isTerminateSignalSent);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
similarity index 51%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
index 5c639e7a4d0..39a64989048 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionFakeExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHeartbeatExtractor.java
@@ -19,38 +19,45 @@
 
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
 
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
-public class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionExtractor {
+public class PipeRealtimeDataRegionHeartbeatExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   @Override
-  public void validate(PipeParameterValidator validator) {
-    // do nothing
-  }
+  public Event supply() {
+    PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
 
-  @Override
-  public void customize(
-      PipeParameters parameters, PipeExtractorRuntimeConfiguration 
configuration) {
-    // do nothing
-  }
+    while (realtimeEvent != null) {
+      Event suppliedEvent = null;
 
-  @Override
-  public void start() {
-    // do nothing
-  }
+      // only supply PipeHeartbeatEvent
+      if (realtimeEvent.getEvent() instanceof PipeHeartbeatEvent) {
+        suppliedEvent = supplyHeartbeat(realtimeEvent);
+      }
+
+      realtimeEvent.decreaseReferenceCount(
+          PipeRealtimeDataRegionHeartbeatExtractor.class.getName(), false);
+
+      if (suppliedEvent != null) {
+        return suppliedEvent;
+      }
+
+      realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
+    }
 
-  @Override
-  public Event supply() {
     return null;
   }
 
   @Override
-  protected void doExtract(PipeRealtimeEvent event) {
-    // do nothing
+  protected void doExtract(final PipeRealtimeEvent event) {
+    // only extract PipeHeartbeatEvent
+    if (event.getEvent() instanceof PipeHeartbeatEvent) {
+      extractHeartbeat(event);
+    } else {
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionHeartbeatExtractor.class.getName(),
 false);
+    }
   }
 
   @Override
@@ -63,13 +70,12 @@ public class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionE
     return false;
   }
 
-  @Override
-  public void close() {
-    // do nothing
-  }
-
   @Override
   public String toString() {
-    return "PipeRealtimeDataRegionFakeExtractor{}";
+    return "PipeRealtimeDataRegionHeartbeatExtractor{"
+        + "dataRegionId='"
+        + dataRegionId
+        + '\''
+        + '}';
   }
 }

Reply via email to