This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8d539602 [IOTDB-5739] Pipe realtime event process: listener -> 
assigner -> matcher -> collector (#9479)
ed8d539602 is described below

commit ed8d5396025f44d7336eccacfe304265114876e7
Author: yschengzi <[email protected]>
AuthorDate: Thu Apr 27 09:47:17 2023 +0800

    [IOTDB-5739] Pipe realtime event process: listener -> assigner -> matcher 
-> collector (#9479)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../builtin/connector/DoNothingConnector.java      |   6 +-
 .../builtin/processor/DoNothingProcessor.java      |   6 +-
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |  99 +++++++++
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |   6 +-
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |   6 +-
 .../iotdb/pipe/api/collector/EventCollector.java   |   6 +-
 .../iotdb/pipe/api/collector/RowCollector.java     |   2 +-
 .../PipeCollectorRuntimeConfiguration.java         |  13 +-
 .../org/apache/iotdb/pipe/api/event/Event.java     |   6 +-
 .../org/apache/iotdb/pipe/api/event/EventType.java |   8 +-
 .../event/{ => dml}/deletion/DeletionEvent.java    |   8 +-
 .../{ => dml}/insertion/TabletInsertionEvent.java  |   8 +-
 .../{ => dml}/insertion/TsFileInsertionEvent.java  |   8 +-
 pom.xml                                            |   6 +
 server/pom.xml                                     |   6 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  17 +-
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |  65 ++++++
 .../collector/PipeCollectorEventPendingQueue.java  |  22 --
 .../realtime/PipeRealtimeDataRegionCollector.java  |  81 +++++++
 .../PipeRealtimeHybridDataRegionCollector.java     | 171 +++++++++++++++
 .../realtime/assigner/DisruptorQueue.java          | 111 ++++++++++
 .../realtime/assigner/PipeDataRegionAssigner.java  |  74 +++++++
 .../realtime/cache/PipeRealtimeEventCache.java     |  22 --
 .../realtime/listener/IoTLogListerner.java         |  22 --
 .../listener/PipeInsertionDataNodeListener.java    | 115 ++++++++++
 .../realtime/listener/RatisLogListener.java        |  22 --
 .../realtime/listener/SimpleLogListener.java       |  22 --
 .../listener/TsFileGenerationListener.java         |  22 --
 .../matcher/CachedSchemaPatternMatcher.java        | 200 +++++++++++++++++
 .../realtime/matcher/PipeDataRegionMatcher.java    |  47 ++++
 .../pipe/core/collector/realtime/matcher/Rule.java |  22 --
 .../realtime/matcher/RulePrefixMatchTree.java      |  22 --
 .../collector/realtime/recorder/TsFileEpoch.java   |  22 --
 .../realtime/recorder/TsFileEpochRecorder.java     |  22 --
 .../event/{ => impl}/PipeTabletInsertionEvent.java |  22 +-
 .../event/{ => impl}/PipeTsFileInsertionEvent.java |  23 +-
 .../pipe/core/event/indexer/PipeEventIndexer.java  |  22 --
 .../core/event/indexer/PipeIoTEventIndexer.java    |  22 --
 .../core/event/indexer/PipeRatisEventIndexer.java  |  22 --
 .../core/event/indexer/PipeSimpleEventIndexer.java |  22 --
 .../core/event/indexer/PipeTsFileEventIndexer.java |  22 --
 .../event/realtime/PipeRealtimeCollectEvent.java   |  66 ++++++
 .../realtime/PipeRealtimeCollectEventFactory.java  |  49 +++++
 .../db/pipe/core/event/realtime/TsFileEpoch.java   |  68 ++++++
 .../core/event/realtime/TsFileEpochManager.java    |  70 ++++++
 .../realtime/TsFileEpochStateMigrator.java}        |   7 +-
 .../pipe/core/event/{ => view}/access/PipeRow.java |   2 +-
 .../event/{ => view}/access/PipeRowIterator.java   |   2 +-
 .../{ => view}/collector/PipeEventCollector.java   |   8 +-
 .../{ => view}/collector/PipeRowCollector.java     |   2 +-
 .../PipeResourceManager.java}                      |  29 ++-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |   6 +-
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |   6 +-
 .../collector/CachedSchemaPatternMatcherTest.java  | 149 +++++++++++++
 .../core/collector/PipeRealtimeCollectTest.java    | 236 +++++++++++++++++++++
 server/src/test/resources/logback-test.xml         |   1 +
 56 files changed, 1753 insertions(+), 398 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 503005fefa..00f3b80424 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 public class DoNothingConnector implements PipeConnector {
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 45634119db..6a18f9a64e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.io.IOException;
 
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
new file mode 100644
index 0000000000..db08728faf
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api;
+
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+/**
+ * PipeCollector
+ *
+ * <p>PipeCollector is responsible for capturing events from sources.
+ *
+ * <p>Various data sources can be supported by implementing different 
PipeCollector classes.
+ *
+ * <p>The lifecycle of a PipeCollector is as follows:
+ *
+ * <ul>
+ *   <li>When a collaboration task is created, the KV pairs of `WITH 
COLLECTOR` clause in SQL are
+ *       parsed and the validation method {@link 
PipeCollector#validate(PipeParameterValidator)}
+ *       will be called to validate the parameters.
+ *   <li>Before the collaboration task starts, the method {@link
+ *       PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} will be called
+ *       to config the runtime behavior of the PipeCollector.
+ *   <li>Then the method {@link PipeCollector#start()} will be called to start 
the PipeCollector.
+ *   <li>While the collaboration task is in progress, the method {@link 
PipeCollector#supply()} will
+ *       be called to capture events from sources and then the events will be 
passed to the
+ *       PipeProcessor.
+ *   <li>The method {@link PipeCollector#close()} will be called when the 
collaboration task is
+ *       cancelled (the `DROP PIPE` command is executed).
+ * </ul>
+ */
+public interface PipeCollector extends PipePlugin {
+
+  /**
+   * This method is mainly used to validate {@link PipeParameters} and it is 
executed before {@link
+   * PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} is called.
+   *
+   * @param validator the validator used to validate {@link PipeParameters}
+   * @throws Exception if any parameter is not valid
+   */
+  void validate(PipeParameterValidator validator) throws Exception;
+
+  /**
+   * This method is mainly used to customize PipeCollector. In this method, 
the user can do the
+   * following things:
+   *
+   * <ul>
+   *   <li>Use PipeParameters to parse key-value pair attributes entered by 
the user.
+   *   <li>Set the running configurations in PipeCollectorRuntimeConfiguration.
+   * </ul>
+   *
+   * <p>This method is called after the method {@link
+   * PipeCollector#validate(PipeParameterValidator)} is called.
+   *
+   * @param parameters used to parse the input parameters entered by the user
+   * @param configuration used to set the required properties of the running 
PipeCollector
+   * @throws Exception the user can throw errors if necessary
+   */
+  void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration)
+      throws Exception;
+
+  /**
+   * Start the collector. After this method is called, events should be ready 
to be supplied by
+   * {@link PipeCollector#supply()}. This method is called after {@link
+   * PipeCollector#customize(PipeParameters, 
PipeCollectorRuntimeConfiguration)} is called.
+   *
+   * @throws Exception the user can throw errors if necessary
+   */
+  void start() throws Exception;
+
+  /**
+   * Supply single event from the collector and the caller will send the event 
to the processor.
+   * This method is called after {@link PipeCollector#start()} is called.
+   *
+   * @return the event to be supplied. the event may be null if the collector 
has no more events at
+   *     the moment, but the collector is still running for more events.
+   * @throws Exception the user can throw errors if necessary
+   */
+  Event supply() throws Exception;
+}
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 78b42c5c70..2c9bbff5aa 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.pipe.api;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 /**
  * PipeConnector
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index ef0b585eae..16a5e81ba6 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 /**
  * PipeProcessor
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 17b8421e95..2e53693d65 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.pipe.api.collector;
 
 import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.io.IOException;
 
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index ee32de275e..bdd96be926 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.pipe.api.collector;
 
 import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import java.io.IOException;
 import java.util.function.BiConsumer;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
similarity index 70%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
copy to 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
index 525e79c137..e99d21be71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
@@ -17,15 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.pipe.api.customizer.collector;
 
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import java.io.IOException;
-
-public class PipeRowCollector implements RowCollector {
+// TODO: complete this class
+public class PipeCollectorRuntimeConfiguration implements 
PipeRuntimeConfiguration {
 
   @Override
-  public void collectRow(Row row) throws IOException {}
+  public void check() throws PipeException {}
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index 74ddf9e47d..f5d8d2fbcd 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,4 +20,8 @@
 package org.apache.iotdb.pipe.api.event;
 
 /** This interface is used to abstract events in collaboration tasks. */
-public interface Event {}
+public interface Event {
+
+  /** @return the type of the event */
+  EventType getType();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
 b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
similarity index 87%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
rename to pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
index e1c50a9601..20be75367d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
@@ -17,6 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.collector;
+package org.apache.iotdb.pipe.api.event;
 
-public class PipeCollectorEventSelector {}
+public enum EventType {
+  TABLET_INSERTION,
+  TSFILE_INSERTION,
+  DELETION,
+}
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
similarity index 87%
rename from 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
rename to 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
index 8a32339cf4..d1fb966379 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.event.deletion;
+package org.apache.iotdb.pipe.api.event.dml.deletion;
 
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 
@@ -39,4 +40,9 @@ public interface DeletionEvent extends Event {
    * @return TimeRange
    */
   TimeRange getTimeRange();
+
+  @Override
+  default EventType getType() {
+    return EventType.DELETION;
+  }
 }
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
similarity index 91%
rename from 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
rename to 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 5880f0fa71..e353bf397e 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.event.insertion;
+package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
@@ -53,4 +54,9 @@ public interface TabletInsertionEvent extends Event {
    *     RowCollector
    */
   TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer);
+
+  @Override
+  default EventType getType() {
+    return EventType.TABLET_INSERTION;
+  }
 }
diff --git 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
similarity index 89%
rename from 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
rename to 
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 7f324b8de6..2d5badc807 100644
--- 
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
+++ 
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.event.insertion;
+package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
 
 /**
  * TsFileInsertionEvent is used to define the event of writing TsFile. Event 
data stores in disks,
@@ -41,4 +42,9 @@ public interface TsFileInsertionEvent extends Event {
    * @return TsFileInsertionEvent
    */
   TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> 
iterable);
+
+  @Override
+  default EventType getType() {
+    return EventType.TSFILE_INSERTION;
+  }
 }
diff --git a/pom.xml b/pom.xml
index d77888658f..a4e1f696de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
         <thrift.version>0.14.1</thrift.version>
         <airline.version>0.8</airline.version>
         <jackson.version>2.13.5</jackson.version>
+        <disrupter.version>3.4.2</disrupter.version>
         <jackson.databind.version>2.13.4.2</jackson.databind.version>
         <antlr4.version>4.8-1</antlr4.version>
         <common.cli.version>1.3.1</common.cli.version>
@@ -283,6 +284,11 @@
                 <artifactId>guava</artifactId>
                 <version>[${guava.version},)</version>
             </dependency>
+            <dependency>
+                <groupId>com.lmax</groupId>
+                <artifactId>disruptor</artifactId>
+                <version>${disrupter.version}</version>
+            </dependency>
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index 187f94872f..fbda65880f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -212,7 +212,6 @@
         <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
-            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
@@ -222,7 +221,10 @@
             <groupId>org.apache.iotdb</groupId>
             <artifactId>node-commons</artifactId>
             <version>${project.version}</version>
-            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.lmax</groupId>
+            <artifactId>disruptor</artifactId>
         </dependency>
     </dependencies>
     <build>
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 395c372ac3..f4f33c1047 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
@@ -278,6 +279,11 @@ public class TsFileProcessor {
       workMemTable.insert(insertRowNode);
     }
 
+    // collect plan node in pipe
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, 
tsFileResource);
+
     // update start time of this memtable
     tsFileResource.updateStartTime(
         insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -379,13 +385,17 @@ public class TsFileProcessor {
       }
       throw new WriteProcessException(e);
     }
-
     for (int i = start; i < end; i++) {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
+
+    // collect plan node in pipe
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), 
insertTabletNode, tsFileResource);
+
     tsFileResource.updateStartTime(
         insertTabletNode.getDeviceID().toStringID(), 
insertTabletNode.getTimes()[start]);
-
     // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
     // for unsequence tsfile, we have to update the endTime for each insertion.
     if (!sequence) {
@@ -843,6 +853,9 @@ public class TsFileProcessor {
                 
.getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
           syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
         }
+        PipeInsertionDataNodeListener.getInstance()
+            .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource);
+
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
         addAMemtableIntoFlushingList(tmpMemTable);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
new file mode 100644
index 0000000000..49a34d03e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config;
+
+// TODO: make these parameters configurable
+// TODO: make all pipe related parameters in one place
+// TODO: set the default value of the parameters in IoTDBDescriptor
+// TODO: move it to common module?
+public class PipeConfig {
+
+  private final int defaultRingBufferSize = 65536;
+
+  private final int matcherCacheSize = 1024;
+
+  private final int realtimeCollectorPendingQueueCapacity = 65536;
+
+  // this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
+  private final int realtimeCollectorPendingQueueTabletLimit =
+      realtimeCollectorPendingQueueCapacity / 2;
+
+  public int getDefaultRingBufferSize() {
+    return defaultRingBufferSize;
+  }
+
+  public int getMatcherCacheSize() {
+    return matcherCacheSize;
+  }
+
+  public int getRealtimeCollectorPendingQueueCapacity() {
+    return realtimeCollectorPendingQueueCapacity;
+  }
+
+  public int getRealtimeCollectorPendingQueueTabletLimit() {
+    return realtimeCollectorPendingQueueTabletLimit;
+  }
+
+  /////////////////////////////// Singleton ///////////////////////////////
+
+  private PipeConfig() {}
+
+  public static PipeConfig getInstance() {
+    return PipeConfigHolder.INSTANCE;
+  }
+
+  private static class PipeConfigHolder {
+    private static final PipeConfig INSTANCE = new PipeConfig();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
deleted file mode 100644
index c3bfb68015..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector;
-
-public class PipeCollectorEventPendingQueue {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
new file mode 100644
index 0000000000..452831c6d6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public abstract class PipeRealtimeDataRegionCollector implements PipeCollector 
{
+
+  protected final String pattern;
+  protected final String dataRegionId;
+
+  public PipeRealtimeDataRegionCollector(String pattern, String dataRegionId) {
+    this.pattern = pattern;
+    this.dataRegionId = dataRegionId;
+  }
+
+  @Override
+  public final void validate(PipeParameterValidator validator) throws 
PipeException {
+    // TODO: complete this method
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+    // TODO: complete this method
+  }
+
+  @Override
+  public void start() {
+    // TODO: if the collector is not started, start it. if the collector is 
started, do nothing.
+    
PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId, 
this);
+  }
+
+  @Override
+  public void close() {
+    // TODO: if the collector is not closed, close it. if the collector is 
closed, do nothing.
+    
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, 
this);
+  }
+
+  /** @param event the event from the storage engine */
+  public abstract void collect(PipeRealtimeCollectEvent event);
+
+  public final String getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRealtimeDataRegionCollector{"
+        + "pattern='"
+        + pattern
+        + '\''
+        + ", dataRegionId='"
+        + dataRegionId
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
new file mode 100644
index 0000000000..76aeebc530
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+// TODO: make this collector as a builtin pipe plugin. register it in 
BuiltinPipePlugin.
+public class PipeRealtimeHybridDataRegionCollector extends 
PipeRealtimeDataRegionCollector {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeRealtimeHybridDataRegionCollector.class);
+
+  // TODO: memory control
+  // This queue is used to store pending events collected by the method 
collect(). The method
+  // supply() will poll events from this queue and send them to the next pipe 
plugin.
+  private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
+
+  public PipeRealtimeHybridDataRegionCollector(String pattern, String 
dataRegionId) {
+    super(pattern, dataRegionId);
+    this.pendingQueue =
+        new ArrayBlockingQueue<>(
+            
PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+  }
+
+  @Override
+  public void collect(PipeRealtimeCollectEvent event) {
+    switch (event.getEvent().getType()) {
+      case TABLET_INSERTION:
+        collectTabletInsertion(event);
+        break;
+      case TSFILE_INSERTION:
+        collectTsFileInsertion(event);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported event type %s for Hybrid Realtime Collector %s",
+                event.getEvent().getType(), this));
+    }
+  }
+
+  private void collectTabletInsertion(PipeRealtimeCollectEvent event) {
+    if (isApproachingCapacity()) {
+      event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+      // if the pending queue is approaching capacity, we should not collect 
any more tablet events.
+      // all the data represented by the tablet events should be carried by 
the following tsfile
+      // event.
+      return;
+    }
+
+    if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) 
{
+      pendingQueue.offer(event);
+    }
+  }
+
+  private void collectTsFileInsertion(PipeRealtimeCollectEvent event) {
+    event
+        .getTsFileEpoch()
+        .migrateState(
+            this,
+            state ->
+                state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
+
+    if (!pendingQueue.offer(event)) {
+      LOGGER.warn(
+          String.format(
+              "Pending Queue of Hybrid Realtime Collector %s has reached 
capacity, discard TsFile Event %s, current state %s",
+              this, event, event.getTsFileEpoch().getState(this)));
+      // TODO: more degradation strategies
+      // TODO: dynamic control of the pending queue capacity
+      // TODO: should be handled by the PipeRuntimeAgent
+    }
+  }
+
+  private boolean isApproachingCapacity() {
+    return pendingQueue.size()
+        >= 
PipeConfig.getInstance().getRealtimeCollectorPendingQueueTabletLimit();
+  }
+
+  @Override
+  public Event supply() {
+    PipeRealtimeCollectEvent collectEvent = pendingQueue.poll();
+
+    while (collectEvent != null) {
+      Event suppliedEvent;
+      switch (collectEvent.getEvent().getType()) {
+        case TABLET_INSERTION:
+          suppliedEvent = supplyTabletInsertion(collectEvent);
+          break;
+        case TSFILE_INSERTION:
+          suppliedEvent = supplyTsFileInsertion(collectEvent);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unsupported event type %s for Hybrid Realtime Collector %s",
+                  collectEvent.getEvent().getType(), this));
+      }
+      if (suppliedEvent != null) {
+        return suppliedEvent;
+      }
+
+      collectEvent = pendingQueue.poll();
+    }
+
+    // means the pending queue is empty.
+    return null;
+  }
+
+  private Event supplyTabletInsertion(PipeRealtimeCollectEvent event) {
+    event
+        .getTsFileEpoch()
+        .migrateState(
+            this,
+            state ->
+                (state.equals(TsFileEpoch.State.EMPTY)) ? 
TsFileEpoch.State.USING_TABLET : state);
+
+    if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
+      return event.getEvent();
+    }
+    // if the state is USING_TSFILE, discard the event and poll the next one.
+    return null;
+  }
+
+  private Event supplyTsFileInsertion(PipeRealtimeCollectEvent event) {
+    event
+        .getTsFileEpoch()
+        .migrateState(
+            this,
+            state -> {
+              // this would not happen, but just in case.
+              if (state.equals(TsFileEpoch.State.EMPTY)) {
+                LOGGER.warn(
+                    String.format("EMPTY TsFileEpoch when supplying TsFile 
Event %s", event));
+                return TsFileEpoch.State.USING_TSFILE;
+              }
+              return state;
+            });
+
+    if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
+      return event.getEvent();
+    }
+    // if the state is USING_TABLET, discard the event and poll the next one.
+    return null;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
new file mode 100644
index 0000000000..163b450021
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+
+public class DisruptorQueue<E> {
+  private Disruptor<Container<E>> disruptor;
+  private RingBuffer<Container<E>> ringBuffer;
+
+  private DisruptorQueue() {}
+
+  public void publish(E obj) {
+    ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), 
obj);
+  }
+
+  public void clear() {
+    disruptor.halt();
+  }
+
+  public static class Builder<E> {
+    private int ringBufferSize = 
PipeConfig.getInstance().getDefaultRingBufferSize();
+    private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
+    private ProducerType producerType = ProducerType.MULTI;
+    private WaitStrategy waitStrategy = new BlockingWaitStrategy();
+    private final List<EventHandler<E>> handlers = new ArrayList<>();
+
+    public Builder<E> setRingBufferSize(int ringBufferSize) {
+      this.ringBufferSize = ringBufferSize;
+      return this;
+    }
+
+    public Builder<E> setThreadFactory(ThreadFactory threadFactory) {
+      this.threadFactory = threadFactory;
+      return this;
+    }
+
+    public Builder<E> setProducerType(ProducerType producerType) {
+      this.producerType = producerType;
+      return this;
+    }
+
+    public Builder<E> setWaitStrategy(WaitStrategy waitStrategy) {
+      this.waitStrategy = waitStrategy;
+      return this;
+    }
+
+    public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
+      this.handlers.add(eventHandler);
+      return this;
+    }
+
+    public DisruptorQueue<E> build() {
+      DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
+      disruptorQueue.disruptor =
+          new Disruptor<>(
+              Container::new, ringBufferSize, threadFactory, producerType, 
waitStrategy);
+      for (EventHandler<E> handler : handlers) {
+        disruptorQueue.disruptor.handleEventsWith(
+            (container, sequence, endOfBatch) ->
+                handler.onEvent(container.getObj(), sequence, endOfBatch));
+      }
+      disruptorQueue.disruptor.start();
+      disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
+      return disruptorQueue;
+    }
+  }
+
+  private static class Container<E> {
+    private E obj;
+
+    private Container() {}
+
+    public E getObj() {
+      return obj;
+    }
+
+    public void setObj(E obj) {
+      this.obj = obj;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
new file mode 100644
index 0000000000..38ee9257be
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
+
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+
+import com.lmax.disruptor.dsl.ProducerType;
+
+public class PipeDataRegionAssigner {
+
+  /** The matcher is used to match the event with the collector based on the 
pattern. */
+  private final PipeDataRegionMatcher matcher;
+
+  /** The disruptor is used to assign the event to the collector. */
+  private final DisruptorQueue<PipeRealtimeCollectEvent> disruptor;
+
+  public PipeDataRegionAssigner() {
+    this.matcher = new CachedSchemaPatternMatcher();
+    this.disruptor =
+        new DisruptorQueue.Builder<PipeRealtimeCollectEvent>()
+            .setProducerType(ProducerType.SINGLE)
+            .addEventHandler(
+                (event, sequence, endOfBatch) -> {
+                  matcher.match(event);
+                  event.gcSchemaInfo();
+                })
+            .build();
+  }
+
+  public void publishToAssign(PipeRealtimeCollectEvent event) {
+    disruptor.publish(event);
+  }
+
+  public void startAssignTo(PipeRealtimeDataRegionCollector collector) {
+    matcher.register(collector);
+  }
+
+  public void stopAssignTo(PipeRealtimeDataRegionCollector collector) {
+    matcher.deregister(collector);
+  }
+
+  public boolean notMoreCollectorNeededToBeAssigned() {
+    return matcher.getRegisterCount() == 0;
+  }
+
+  /**
+   * Clear the matcher and disruptor. The method publishToAssign should be 
work after calling this
+   * method.
+   */
+  public void gc() {
+    matcher.clear();
+    disruptor.clear();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
deleted file mode 100644
index 7525e06b54..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
-
-public class PipeRealtimeEventCache {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
deleted file mode 100644
index b91334430d..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class IoTLogListerner {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
new file mode 100644
index 0000000000..bfa764c371
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
+import 
org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * PipeInsertionEventListener is a singleton in each data node.
+ *
+ * <p>It is used to listen to events from storage engine and publish them to 
pipe engine.
+ *
+ * <p>2 kinds of events are collected: 1. level-0 tsfile sealed event 2. 
insertion operation event
+ *
+ * <p>All events collected by this listener will be first published to 
different
+ * PipeEventDataRegionAssigners (identified by data region id), and then 
PipeEventDataRegionAssigner
+ * will filter events and assign them to different 
PipeRealtimeEventDataRegionCollectors.
+ */
+public class PipeInsertionDataNodeListener {
+
+  private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
+      new ConcurrentHashMap<>();
+
+  public synchronized void startListenAndAssign(
+      String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+    dataRegionId2Assigner
+        .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
+        .startAssignTo(collector);
+  }
+
+  public synchronized void stopListenAndAssign(
+      String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+    final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
+    if (assigner == null) {
+      return;
+    }
+
+    assigner.stopAssignTo(collector);
+
+    if (assigner.notMoreCollectorNeededToBeAssigned()) {
+      // the removed assigner will is the same as the one referenced by the 
variable `assigner`
+      dataRegionId2Assigner.remove(dataRegionId);
+      // this will help to release the memory occupied by the assigner
+      assigner.gc();
+    }
+  }
+
+  // TODO: listen to the tsfile synced from the other cluster
+  // TODO: check whether the method is called on the right place. what is the 
meaning of the
+  // variable shouldClose before calling this method?
+  // TODO: maximum the efficiency of the method when there is no pipe in the 
system, avoid
+  // dataRegionId2Assigner.get(dataRegionId);
+  public void listenToTsFile(String dataRegionId, TsFileResource 
tsFileResource) {
+    final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
+
+    // only events from registered data region will be collected
+    if (assigner == null) {
+      return;
+    }
+
+    assigner.publishToAssign(
+        PipeRealtimeCollectEventFactory.createCollectEvent(
+            tsFileResource.getTsFile(), tsFileResource));
+  }
+
+  // TODO: check whether the method is called on the right place.
+  public void listenToInsertNode(
+      String dataRegionId, InsertNode insertNode, TsFileResource 
tsFileResource) {
+    final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
+
+    // only events from registered data region will be collected
+    if (assigner == null) {
+      return;
+    }
+
+    assigner.publishToAssign(
+        PipeRealtimeCollectEventFactory.createCollectEvent(insertNode, 
tsFileResource));
+  }
+
+  /////////////////////////////// singleton ///////////////////////////////
+
+  private PipeInsertionDataNodeListener() {}
+
+  public static PipeInsertionDataNodeListener getInstance() {
+    return PipeChangeDataCaptureListenerHolder.INSTANCE;
+  }
+
+  private static class PipeChangeDataCaptureListenerHolder {
+    private static final PipeInsertionDataNodeListener INSTANCE =
+        new PipeInsertionDataNodeListener();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
deleted file mode 100644
index 3aa42354f5..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class RatisLogListener {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
deleted file mode 100644
index b5e1eaf93b..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class SimpleLogListener {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
deleted file mode 100644
index 4c28795219..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class TsFileGenerationListener {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
new file mode 100644
index 0000000000..0589cf09d7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
+
+  private final ReentrantReadWriteLock lock;
+
+  private final Set<PipeRealtimeDataRegionCollector> collectors;
+  private final Cache<String, Set<PipeRealtimeDataRegionCollector>> 
deviceToCollectorsCache;
+
+  public CachedSchemaPatternMatcher() {
+    this.lock = new ReentrantReadWriteLock();
+    this.collectors = new HashSet<>();
+    this.deviceToCollectorsCache =
+        
Caffeine.newBuilder().maximumSize(PipeConfig.getInstance().getMatcherCacheSize()).build();
+  }
+
+  @Override
+  public void register(PipeRealtimeDataRegionCollector collector) {
+    lock.writeLock().lock();
+    try {
+      collectors.add(collector);
+      deviceToCollectorsCache.invalidateAll();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void deregister(PipeRealtimeDataRegionCollector collector) {
+    lock.writeLock().lock();
+    try {
+      collectors.remove(collector);
+      deviceToCollectorsCache.invalidateAll();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public int getRegisterCount() {
+    lock.readLock().lock();
+    try {
+      return collectors.size();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  // TODO: maximum the efficiency of matching when pattern is root
+  // TODO: memory control
+  @Override
+  public void match(PipeRealtimeCollectEvent event) {
+    final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new 
HashSet<>();
+
+    lock.readLock().lock();
+    try {
+      if (collectors.isEmpty()) {
+        return;
+      }
+
+      for (final Map.Entry<String, String[]> entry : 
event.getSchemaInfo().entrySet()) {
+        final String device = entry.getKey();
+        final String[] measurements = entry.getValue();
+
+        // 1. try to get matched collectors from cache, if not success, match 
them by device
+        final Set<PipeRealtimeDataRegionCollector> collectorsFilteredByDevice =
+            deviceToCollectorsCache.get(device, 
this::filterCollectorsByDevice);
+        // this would not happen
+        if (collectorsFilteredByDevice == null) {
+          LOGGER.warn(String.format("Match result NPE when handle device %s", 
device));
+          continue;
+        }
+
+        // 2. filter matched candidate collectors by measurements
+        if (measurements.length == 0) {
+          // `measurements` is empty (only in case of tsfile event). match all 
collectors.
+          //
+          // case 1: for example, pattern is root.a.b, device is root.a.b.c, 
measurement can be any.
+          // in this case, the collector can be matched without checking the 
measurements.
+          //
+          // case 2: for example, pattern is root.a.b.c, device is root.a.b.
+          // in this situation, the collector can not be matched in some 
cases, but we can not know
+          // all the measurements of the device in an efficient way, so we 
ASSUME that the collector
+          // can be matched. this is a trade-off between efficiency and 
accuracy. for most user's
+          // usage, this is acceptable, which may result in some unnecessary 
data processing and
+          // transmission, but will not result in data loss.
+          matchedCollectors.addAll(collectorsFilteredByDevice);
+        } else {
+          // `measurements` is not empty (only in case of tablet event). match 
collectors by
+          // measurements.
+          collectorsFilteredByDevice.forEach(
+              collector -> {
+                final String pattern = collector.getPattern();
+
+                // case 1: for example, pattern is root.a.b and device is 
root.a.b.c
+                // in this case, the collector can be matched without checking 
the measurements
+                if (pattern.length() <= device.length()) {
+                  matchedCollectors.add(collector);
+                }
+                // case 2: for example, pattern is root.a.b.c and device is 
root.a.b
+                // in this case, we need to check the full path
+                else {
+                  for (String measurement : measurements) {
+                    // for example, pattern is root.a.b.c, device is root.a.b 
and measurement is c
+                    // in this case, the collector can be matched. other cases 
are not matched.
+                    // please note that there should be a . between device and 
measurement.
+                    if (
+                    // low cost check comes first
+                    pattern.length() == device.length() + measurement.length() 
+ 1
+                        // high cost check comes later
+                        && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
+                      matchedCollectors.add(collector);
+                      // there would be no more matched collectors because the 
measurements are
+                      // unique
+                      break;
+                    }
+                  }
+                }
+              });
+        }
+
+        if (matchedCollectors.size() == collectors.size()) {
+          break;
+        }
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+
+    matchedCollectors.forEach(collector -> collector.collect(event));
+  }
+
+  private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String 
device) {
+    final Set<PipeRealtimeDataRegionCollector> filteredCollectors = new 
HashSet<>();
+
+    for (PipeRealtimeDataRegionCollector collector : collectors) {
+      String pattern = collector.getPattern();
+      if (
+      // for example, pattern is root.a.b and device is root.a.b.c
+      // in this case, the collector can be matched without checking the 
measurements
+      (pattern.length() <= device.length() && device.startsWith(pattern))
+          // for example, pattern is root.a.b.c and device is root.a.b
+          // in this case, the collector can be selected as candidate, but the 
measurements should
+          // be checked further
+          || (pattern.length() > device.length() && 
pattern.startsWith(device))) {
+        filteredCollectors.add(collector);
+      }
+    }
+
+    return filteredCollectors;
+  }
+
+  @Override
+  public void clear() {
+    lock.writeLock().lock();
+    try {
+      collectors.clear();
+      deviceToCollectorsCache.invalidateAll();
+      deviceToCollectorsCache.cleanUp();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
new file mode 100644
index 0000000000..dcdbffa71e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+
+public interface PipeDataRegionMatcher {
+
+  /**
+   * Register a collector. If the collector's pattern matches the event's 
schema info, the event
+   * will be assigned to the collector.
+   */
+  void register(PipeRealtimeDataRegionCollector collector);
+
+  /** Deregister a collector. */
+  void deregister(PipeRealtimeDataRegionCollector collector);
+
+  /** Get the number of registered collectors in this matcher. */
+  int getRegisterCount();
+
+  /**
+   * Match the event's schema info with the registered collectors' patterns. 
If the event's schema
+   * info matches the pattern of a collector, the event will be assigned to 
the collector.
+   */
+  void match(PipeRealtimeCollectEvent event);
+
+  /** Clear all the registered collectors and internal data structures. */
+  void clear();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
deleted file mode 100644
index 948fab2acd..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-
-public class Rule {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
deleted file mode 100644
index a543d7f0ef..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-
-public class RulePrefixMatchTree {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
deleted file mode 100644
index a85d3a0b70..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-
-public class TsFileEpoch {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
deleted file mode 100644
index 6948cef200..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-
-public class TsFileEpochRecorder {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
similarity index 68%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index a58e8db497..3afeabf9ea 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
@@ -29,18 +30,29 @@ import java.util.function.BiConsumer;
 
 public class PipeTabletInsertionEvent implements TabletInsertionEvent {
 
+  private final InsertNode insertNode;
+
+  public PipeTabletInsertionEvent(InsertNode insertNode) {
+    this.insertNode = insertNode;
+  }
+
   @Override
   public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    return null;
+    throw new UnsupportedOperationException("Not implemented yet");
   }
 
   @Override
   public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer) {
-    return null;
+    throw new UnsupportedOperationException("Not implemented yet");
   }
 
   @Override
   public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    return null;
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public String toString() {
+    return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
similarity index 65%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
copy to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index d11b4f780d..7282fb77bc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -17,20 +17,33 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.core.event.impl;
 
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.io.File;
 
 public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
 
+  private final File tsFile;
+
+  public PipeTsFileInsertionEvent(File tsFile) {
+    this.tsFile = tsFile;
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return null;
+    throw new UnsupportedOperationException("Not implemented yet");
   }
 
   @Override
   public TsFileInsertionEvent 
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
-    return null;
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public String toString() {
+    return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
deleted file mode 100644
index 22cbce9e04..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public interface PipeEventIndexer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
deleted file mode 100644
index 63aff8a519..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeIoTEventIndexer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
deleted file mode 100644
index 4520855085..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeRatisEventIndexer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
deleted file mode 100644
index d5add45547..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeSimpleEventIndexer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
deleted file mode 100644
index be53e89751..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
new file mode 100644
index 0000000000..4041e74918
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
+
+import java.util.Map;
+
+public class PipeRealtimeCollectEvent implements Event {
+
+  private final Event event;
+  private final TsFileEpoch tsFileEpoch;
+
+  private Map<String, String[]> device2Measurements;
+
+  public PipeRealtimeCollectEvent(
+      Event event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
+    this.event = event;
+    this.tsFileEpoch = tsFileEpoch;
+    this.device2Measurements = device2Measurements;
+  }
+
+  public Event getEvent() {
+    return event;
+  }
+
+  public TsFileEpoch getTsFileEpoch() {
+    return tsFileEpoch;
+  }
+
+  public Map<String, String[]> getSchemaInfo() {
+    return device2Measurements;
+  }
+
+  public void gcSchemaInfo() {
+    device2Measurements = null;
+  }
+
+  @Override
+  public EventType getType() {
+    return event.getType();
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" + 
tsFileEpoch + '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
new file mode 100644
index 0000000000..2bb1b3174f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+
+import java.io.File;
+
+public class PipeRealtimeCollectEventFactory {
+
+  private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
+
+  // TODO: resource control here?
+  public static PipeRealtimeCollectEvent createCollectEvent(File tsFile, 
TsFileResource resource) {
+    return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
+        new PipeTsFileInsertionEvent(tsFile), resource);
+  }
+
+  // TODO: resource control here?
+  public static PipeRealtimeCollectEvent createCollectEvent(
+      InsertNode node, TsFileResource resource) {
+    return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
+        new PipeTabletInsertionEvent(node), node, resource);
+  }
+
+  private PipeRealtimeCollectEventFactory() {
+    // factory class, do not instantiate
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
new file mode 100644
index 0000000000..b6c71f45d4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TsFileEpoch {
+
+  private final String filePath;
+  private final ConcurrentMap<PipeRealtimeDataRegionCollector, 
AtomicReference<State>>
+      dataRegionCollector2State;
+
+  public TsFileEpoch(String filePath) {
+    this.filePath = filePath;
+    this.dataRegionCollector2State = new ConcurrentHashMap<>();
+  }
+
+  public TsFileEpoch.State getState(PipeRealtimeDataRegionCollector collector) 
{
+    return dataRegionCollector2State
+        .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+        .get();
+  }
+
+  public void migrateState(
+      PipeRealtimeDataRegionCollector collector, TsFileEpochStateMigrator 
visitor) {
+    dataRegionCollector2State
+        .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+        .getAndUpdate(visitor::migrate);
+  }
+
+  @Override
+  public String toString() {
+    return "TsFileEpoch{"
+        + "filePath='"
+        + filePath
+        + '\''
+        + ", dataRegionCollector2State="
+        + dataRegionCollector2State
+        + '}';
+  }
+
+  public enum State {
+    EMPTY,
+    USING_TABLET,
+    USING_TSFILE
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
new file mode 100644
index 0000000000..b786977864
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TsFileEpochManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileEpochManager.class);
+
+  private static final String[] EMPTY_MEASUREMENT_ARRAY = new String[0];
+
+  private final Map<String, TsFileEpoch> filePath2Epoch = new HashMap<>();
+
+  public PipeRealtimeCollectEvent bindPipeTsFileInsertionEvent(
+      PipeTsFileInsertionEvent event, TsFileResource resource) {
+    final String filePath = resource.getTsFilePath();
+
+    // this would not happen, but just in case
+    if (!filePath2Epoch.containsKey(filePath)) {
+      LOGGER.warn(
+          String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, 
create it", filePath));
+      filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
+    }
+
+    return new PipeRealtimeCollectEvent(
+        event,
+        // TODO: we have to make sure that the TsFileInsertionEvent is the 
last event of the
+        // TsFileEpoch's life cycle
+        filePath2Epoch.remove(filePath),
+        resource.getDevices().stream()
+            .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)));
+  }
+
+  public PipeRealtimeCollectEvent bindPipeTabletInsertionEvent(
+      PipeTabletInsertionEvent event, InsertNode node, TsFileResource 
resource) {
+    return new PipeRealtimeCollectEvent(
+        event,
+        filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new),
+        Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
similarity index 81%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
index b5fad778b0..f75f7c87e5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
@@ -17,6 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.collector.realtime;
+package org.apache.iotdb.db.pipe.core.event.realtime;
 
-public class PipeRealtimeCollector {}
+@FunctionalInterface
+public interface TsFileEpochStateMigrator {
+  TsFileEpoch.State migrate(final TsFileEpoch.State state);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 43b438445f..6c9a5c55f8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.access;
+package org.apache.iotdb.db.pipe.core.event.view.access;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
similarity index 96%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
index 960214bea6..73ee4d041a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.access;
+package org.apache.iotdb.db.pipe.core.event.view.access;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.access.RowIterator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
similarity index 82%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 2c2bbfd38f..7cc0778193 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.db.pipe.core.event.view.collector;
 
 import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.io.IOException;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
similarity index 94%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 525e79c137..687c3e72c1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.db.pipe.core.event.view.collector;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
similarity index 58%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index d11b4f780d..fc9ed29eaf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -17,20 +17,29 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.resource;
 
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import java.io.File;
 
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
+public class PipeResourceManager {
 
-  @Override
-  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    return null;
+  public File addFileReference(File file) {
+    return file;
   }
 
-  @Override
-  public TsFileInsertionEvent 
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
-    return null;
+  public File removeFileReference(File file) {
+    return file;
+  }
+
+  ///////////////////////////// SINGLETON /////////////////////////////
+
+  private PipeResourceManager() {}
+
+  public static PipeResourceManager getInstance() {
+    return PipeResourceManagerHolder.INSTANCE;
+  }
+
+  private static class PipeResourceManagerHolder {
+    private static final PipeResourceManager INSTANCE = new 
PipeResourceManager();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index e6764cf699..b420d9e51a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index aa8772f2a1..34f8045cb6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.task.subtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
new file mode 100644
index 0000000000..4a9abbef61
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class CachedSchemaPatternMatcherTest {
+
+  private CachedSchemaPatternMatcher matcher;
+  private ExecutorService executorService;
+  private List<PipeRealtimeDataRegionCollector> collectorList;
+
+  @Before
+  public void setUp() {
+    matcher = new CachedSchemaPatternMatcher();
+    executorService = Executors.newSingleThreadExecutor();
+    collectorList = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() {
+    executorService.shutdownNow();
+  }
+
+  @Test
+  public void testCachedMatcher() throws ExecutionException, 
InterruptedException {
+    PipeRealtimeDataRegionCollector databaseCollector =
+        new PipeRealtimeFakeDataRegionCollector("root", "1");
+    collectorList.add(databaseCollector);
+
+    int deviceCollectorNum = 10;
+    int seriesCollectorNum = 10;
+    for (int i = 0; i < deviceCollectorNum; i++) {
+      PipeRealtimeDataRegionCollector deviceCollector =
+          new PipeRealtimeFakeDataRegionCollector("root." + i, "1");
+      collectorList.add(deviceCollector);
+      for (int j = 0; j < seriesCollectorNum; j++) {
+        PipeRealtimeDataRegionCollector seriesCollector =
+            new PipeRealtimeFakeDataRegionCollector("root." + i + "." + j, 
"1");
+        collectorList.add(seriesCollector);
+      }
+    }
+
+    Future<?> future =
+        executorService.submit(
+            () -> collectorList.forEach(collector -> 
matcher.register(collector)));
+
+    int epochNum = 10000;
+    int deviceNum = 1000;
+    int seriesNum = 100;
+    Map<String, String[]> deviceMap =
+        IntStream.range(0, deviceNum)
+            .mapToObj(String::valueOf)
+            .collect(Collectors.toMap(s -> "root." + s, s -> new String[0]));
+    String[] measurements =
+        IntStream.range(0, 
seriesNum).mapToObj(String::valueOf).toArray(String[]::new);
+    long totalTime = 0;
+    for (int i = 0; i < epochNum; i++) {
+      for (int j = 0; j < deviceNum; j++) {
+        PipeRealtimeCollectEvent event =
+            new PipeRealtimeCollectEvent(
+                null, null, Collections.singletonMap("root." + i, 
measurements));
+        long startTime = System.currentTimeMillis();
+        matcher.match(event);
+        totalTime += (System.currentTimeMillis() - startTime);
+      }
+      PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, 
null, deviceMap);
+      long startTime = System.currentTimeMillis();
+      matcher.match(event);
+      totalTime += (System.currentTimeMillis() - startTime);
+    }
+    System.out.println("matcher.getRegisterCount() = " + 
matcher.getRegisterCount());
+    System.out.println("totalTime = " + totalTime);
+    System.out.println(
+        "device match per second = "
+            + ((double) (epochNum * (deviceNum + 1)) / (double) (totalTime) * 
1000.0));
+
+    future.get();
+  }
+
+  public static class PipeRealtimeFakeDataRegionCollector extends 
PipeRealtimeDataRegionCollector {
+
+    public PipeRealtimeFakeDataRegionCollector(String pattern, String 
dataRegionId) {
+      super(pattern, dataRegionId);
+    }
+
+    @Override
+    public Event supply() {
+      return null;
+    }
+
+    @Override
+    public void collect(PipeRealtimeCollectEvent event) {
+      final boolean[] match = {false};
+      event
+          .getSchemaInfo()
+          .forEach(
+              (k, v) -> {
+                if (v.length > 0) {
+                  for (String s : v) {
+                    match[0] =
+                        match[0]
+                            || (k + TsFileConstant.PATH_SEPARATOR + 
s).startsWith(getPattern());
+                  }
+                } else {
+                  match[0] = match[0] || (getPattern().startsWith(k) || 
k.startsWith(getPattern()));
+                }
+              });
+      Assert.assertTrue(match[0]);
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
new file mode 100644
index 0000000000..3d395456d2
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeHybridDataRegionCollector;
+import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+public class PipeRealtimeCollectTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
+
+  private final String dataRegion1 = "dataRegion-1";
+  private final String dataRegion2 = "dataRegion-2";
+  private final String pattern1 = "root.sg.d";
+  private final String pattern2 = "root.sg.d.a";
+  private final String[] device = new String[] {"root", "sg", "d"};
+  private final AtomicBoolean alive = new AtomicBoolean();
+
+  private ExecutorService writeService;
+  private ExecutorService listenerService;
+
+  @Before
+  public void setUp() {
+    writeService = Executors.newFixedThreadPool(2);
+    listenerService = Executors.newFixedThreadPool(4);
+  }
+
+  @After
+  public void tearDown() {
+    writeService.shutdownNow();
+    listenerService.shutdownNow();
+  }
+
+  @Test
+  public void testRealtimeCollectProcess() throws ExecutionException, 
InterruptedException {
+    // set up realtime collector
+
+    try (PipeRealtimeHybridDataRegionCollector collector1 =
+            new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion1);
+        PipeRealtimeHybridDataRegionCollector collector2 =
+            new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion1);
+        PipeRealtimeHybridDataRegionCollector collector3 =
+            new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion2);
+        PipeRealtimeHybridDataRegionCollector collector4 =
+            new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion2)) {
+
+      PipeRealtimeDataRegionCollector[] collectors =
+          new PipeRealtimeDataRegionCollector[] {collector1, collector2, 
collector3, collector4};
+
+      // start collector 0, 1
+      collectors[0].start();
+      collectors[1].start();
+
+      // test result of collector 0, 1
+      int writeNum = 10;
+      List<Future<?>> writeFutures =
+          Arrays.asList(
+              write2DataRegion(writeNum, dataRegion1), 
write2DataRegion(writeNum, dataRegion2));
+
+      alive.set(true);
+      List<Future<?>> listenFutures =
+          Arrays.asList(
+              listen(
+                  collectors[0],
+                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  writeNum << 1),
+              listen(collectors[1], typ2 -> 1, writeNum));
+
+      try {
+        listenFutures.get(0).get(10, TimeUnit.MINUTES);
+        listenFutures.get(1).get(10, TimeUnit.MINUTES);
+      } catch (TimeoutException e) {
+        LOGGER.warn("Time out when listening collector", e);
+        alive.set(false);
+        Assert.fail();
+      }
+      writeFutures.forEach(
+          future -> {
+            try {
+              future.get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException(e);
+            }
+          });
+
+      // start collector 2, 3
+      collectors[2].start();
+      collectors[3].start();
+
+      // test result of collector 0 - 3
+      writeFutures =
+          Arrays.asList(
+              write2DataRegion(writeNum, dataRegion1), 
write2DataRegion(writeNum, dataRegion2));
+
+      alive.set(true);
+      listenFutures =
+          Arrays.asList(
+              listen(
+                  collectors[0],
+                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  writeNum << 1),
+              listen(collectors[1], typ2 -> 1, writeNum),
+              listen(
+                  collectors[2],
+                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  writeNum << 1),
+              listen(collectors[3], typ2 -> 1, writeNum));
+      try {
+        listenFutures.get(0).get(10, TimeUnit.MINUTES);
+        listenFutures.get(1).get(10, TimeUnit.MINUTES);
+        listenFutures.get(2).get(10, TimeUnit.MINUTES);
+        listenFutures.get(3).get(10, TimeUnit.MINUTES);
+      } catch (TimeoutException e) {
+        LOGGER.warn("Time out when listening collector", e);
+        alive.set(false);
+        Assert.fail();
+      }
+      writeFutures.forEach(
+          future -> {
+            try {
+              future.get();
+            } catch (InterruptedException | ExecutionException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private Future<?> write2DataRegion(int writeNum, String dataRegionId) {
+    return writeService.submit(
+        () -> {
+          for (int i = 0; i < writeNum; ++i) {
+            TsFileResource resource =
+                new TsFileResource(new File(dataRegionId, 
String.format("%s-%s-0-0.tsfile", i, i)));
+            
resource.updateStartTime(String.join(TsFileConstant.PATH_SEPARATOR, device), 0);
+
+            PipeInsertionDataNodeListener.getInstance()
+                .listenToInsertNode(
+                    dataRegionId,
+                    new InsertRowNode(
+                        new PlanNodeId(String.valueOf(i)),
+                        new PartialPath(device),
+                        false,
+                        new String[] {"a"},
+                        null,
+                        0,
+                        null,
+                        false),
+                    resource);
+            PipeInsertionDataNodeListener.getInstance()
+                .listenToInsertNode(
+                    dataRegionId,
+                    new InsertRowNode(
+                        new PlanNodeId(String.valueOf(i)),
+                        new PartialPath(device),
+                        false,
+                        new String[] {"b"},
+                        null,
+                        0,
+                        null,
+                        false),
+                    resource);
+            
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
resource);
+          }
+        });
+  }
+
+  private Future<?> listen(
+      PipeRealtimeDataRegionCollector collector,
+      Function<EventType, Integer> weight,
+      int expectNum) {
+    return listenerService.submit(
+        () -> {
+          int eventNum = 0;
+          try {
+            while (alive.get() && eventNum < expectNum) {
+              Event event;
+              try {
+                event = collector.supply();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+              if (event != null) {
+                eventNum += weight.apply(event.getType());
+              }
+            }
+          } finally {
+            Assert.assertEquals(expectNum, eventNum);
+          }
+        });
+  }
+}
diff --git a/server/src/test/resources/logback-test.xml 
b/server/src/test/resources/logback-test.xml
index d70151b7dd..eeab97f16b 100644
--- a/server/src/test/resources/logback-test.xml
+++ b/server/src/test/resources/logback-test.xml
@@ -40,6 +40,7 @@
     <!-- enable me if you want to monitor when files are opened and closed.
     <logger name="FileMonitor" level="info"/>
     -->
+    <logger name="org.apache.iotdb.db.pipe" level="INFO"/>
     <logger name="org.apache.iotdb.db.sync" level="INFO"/>
     <logger name="org.apache.iotdb.db.engine.merge" level="INFO"/>
     <logger name="org.apache.iotdb.db.metadata" level="INFO"/>

Reply via email to