This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ed8d539602 [IOTDB-5739] Pipe realtime event process: listener ->
assigner -> matcher -> collector (#9479)
ed8d539602 is described below
commit ed8d5396025f44d7336eccacfe304265114876e7
Author: yschengzi <[email protected]>
AuthorDate: Thu Apr 27 09:47:17 2023 +0800
[IOTDB-5739] Pipe realtime event process: listener -> assigner -> matcher
-> collector (#9479)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../builtin/connector/DoNothingConnector.java | 6 +-
.../builtin/processor/DoNothingProcessor.java | 6 +-
.../org/apache/iotdb/pipe/api/PipeCollector.java | 99 +++++++++
.../org/apache/iotdb/pipe/api/PipeConnector.java | 6 +-
.../org/apache/iotdb/pipe/api/PipeProcessor.java | 6 +-
.../iotdb/pipe/api/collector/EventCollector.java | 6 +-
.../iotdb/pipe/api/collector/RowCollector.java | 2 +-
.../PipeCollectorRuntimeConfiguration.java | 13 +-
.../org/apache/iotdb/pipe/api/event/Event.java | 6 +-
.../org/apache/iotdb/pipe/api/event/EventType.java | 8 +-
.../event/{ => dml}/deletion/DeletionEvent.java | 8 +-
.../{ => dml}/insertion/TabletInsertionEvent.java | 8 +-
.../{ => dml}/insertion/TsFileInsertionEvent.java | 8 +-
pom.xml | 6 +
server/pom.xml | 6 +-
.../db/engine/storagegroup/TsFileProcessor.java | 17 +-
.../apache/iotdb/db/pipe/config/PipeConfig.java | 65 ++++++
.../collector/PipeCollectorEventPendingQueue.java | 22 --
.../realtime/PipeRealtimeDataRegionCollector.java | 81 +++++++
.../PipeRealtimeHybridDataRegionCollector.java | 171 +++++++++++++++
.../realtime/assigner/DisruptorQueue.java | 111 ++++++++++
.../realtime/assigner/PipeDataRegionAssigner.java | 74 +++++++
.../realtime/cache/PipeRealtimeEventCache.java | 22 --
.../realtime/listener/IoTLogListerner.java | 22 --
.../listener/PipeInsertionDataNodeListener.java | 115 ++++++++++
.../realtime/listener/RatisLogListener.java | 22 --
.../realtime/listener/SimpleLogListener.java | 22 --
.../listener/TsFileGenerationListener.java | 22 --
.../matcher/CachedSchemaPatternMatcher.java | 200 +++++++++++++++++
.../realtime/matcher/PipeDataRegionMatcher.java | 47 ++++
.../pipe/core/collector/realtime/matcher/Rule.java | 22 --
.../realtime/matcher/RulePrefixMatchTree.java | 22 --
.../collector/realtime/recorder/TsFileEpoch.java | 22 --
.../realtime/recorder/TsFileEpochRecorder.java | 22 --
.../event/{ => impl}/PipeTabletInsertionEvent.java | 22 +-
.../event/{ => impl}/PipeTsFileInsertionEvent.java | 23 +-
.../pipe/core/event/indexer/PipeEventIndexer.java | 22 --
.../core/event/indexer/PipeIoTEventIndexer.java | 22 --
.../core/event/indexer/PipeRatisEventIndexer.java | 22 --
.../core/event/indexer/PipeSimpleEventIndexer.java | 22 --
.../core/event/indexer/PipeTsFileEventIndexer.java | 22 --
.../event/realtime/PipeRealtimeCollectEvent.java | 66 ++++++
.../realtime/PipeRealtimeCollectEventFactory.java | 49 +++++
.../db/pipe/core/event/realtime/TsFileEpoch.java | 68 ++++++
.../core/event/realtime/TsFileEpochManager.java | 70 ++++++
.../realtime/TsFileEpochStateMigrator.java} | 7 +-
.../pipe/core/event/{ => view}/access/PipeRow.java | 2 +-
.../event/{ => view}/access/PipeRowIterator.java | 2 +-
.../{ => view}/collector/PipeEventCollector.java | 8 +-
.../{ => view}/collector/PipeRowCollector.java | 2 +-
.../PipeResourceManager.java} | 29 ++-
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 6 +-
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 6 +-
.../collector/CachedSchemaPatternMatcherTest.java | 149 +++++++++++++
.../core/collector/PipeRealtimeCollectTest.java | 236 +++++++++++++++++++++
server/src/test/resources/logback-test.xml | 1 +
56 files changed, 1753 insertions(+), 398 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 503005fefa..00f3b80424 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
public class DoNothingConnector implements PipeConnector {
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 45634119db..6a18f9a64e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
new file mode 100644
index 0000000000..db08728faf
--- /dev/null
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.api;
+
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+/**
+ * PipeCollector
+ *
+ * <p>PipeCollector is responsible for capturing events from sources.
+ *
+ * <p>Various data sources can be supported by implementing different
PipeCollector classes.
+ *
+ * <p>The lifecycle of a PipeCollector is as follows:
+ *
+ * <ul>
+ * <li>When a collaboration task is created, the KV pairs of `WITH
COLLECTOR` clause in SQL are
+ * parsed and the validation method {@link
PipeCollector#validate(PipeParameterValidator)}
+ * will be called to validate the parameters.
+ * <li>Before the collaboration task starts, the method {@link
+ * PipeCollector#customize(PipeParameters,
PipeCollectorRuntimeConfiguration)} will be called
+ * to config the runtime behavior of the PipeCollector.
+ * <li>Then the method {@link PipeCollector#start()} will be called to start
the PipeCollector.
+ * <li>While the collaboration task is in progress, the method {@link
PipeCollector#supply()} will
+ * be called to capture events from sources and then the events will be
passed to the
+ * PipeProcessor.
+ * <li>The method {@link PipeCollector#close()} will be called when the
collaboration task is
+ * cancelled (the `DROP PIPE` command is executed).
+ * </ul>
+ */
+public interface PipeCollector extends PipePlugin {
+
+ /**
+ * This method is mainly used to validate {@link PipeParameters} and it is
executed before {@link
+ * PipeCollector#customize(PipeParameters,
PipeCollectorRuntimeConfiguration)} is called.
+ *
+ * @param validator the validator used to validate {@link PipeParameters}
+ * @throws Exception if any parameter is not valid
+ */
+ void validate(PipeParameterValidator validator) throws Exception;
+
+ /**
+ * This method is mainly used to customize PipeCollector. In this method,
the user can do the
+ * following things:
+ *
+ * <ul>
+ * <li>Use PipeParameters to parse key-value pair attributes entered by
the user.
+ * <li>Set the running configurations in PipeCollectorRuntimeConfiguration.
+ * </ul>
+ *
+ * <p>This method is called after the method {@link
+ * PipeCollector#validate(PipeParameterValidator)} is called.
+ *
+ * @param parameters used to parse the input parameters entered by the user
+ * @param configuration used to set the required properties of the running
PipeCollector
+ * @throws Exception the user can throw errors if necessary
+ */
+ void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration)
+ throws Exception;
+
+ /**
+ * Start the collector. After this method is called, events should be ready
to be supplied by
+ * {@link PipeCollector#supply()}. This method is called after {@link
+ * PipeCollector#customize(PipeParameters,
PipeCollectorRuntimeConfiguration)} is called.
+ *
+ * @throws Exception the user can throw errors if necessary
+ */
+ void start() throws Exception;
+
+ /**
+ * Supply single event from the collector and the caller will send the event
to the processor.
+ * This method is called after {@link PipeCollector#start()} is called.
+ *
+ * @return the event to be supplied. the event may be null if the collector
has no more events at
+ * the moment, but the collector is still running for more events.
+ * @throws Exception the user can throw errors if necessary
+ */
+ Event supply() throws Exception;
+}
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 78b42c5c70..2c9bbff5aa 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.pipe.api;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
/**
* PipeConnector
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index ef0b585eae..16a5e81ba6 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
/**
* PipeProcessor
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 17b8421e95..2e53693d65 100644
---
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.pipe.api.collector;
import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
index ee32de275e..bdd96be926 100644
---
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.pipe.api.collector;
import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import java.io.IOException;
import java.util.function.BiConsumer;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
similarity index 70%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
copy to
pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
index 525e79c137..e99d21be71 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java
@@ -17,15 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.pipe.api.customizer.collector;
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeException;
-import java.io.IOException;
-
-public class PipeRowCollector implements RowCollector {
+// TODO: complete this class
+public class PipeCollectorRuntimeConfiguration implements
PipeRuntimeConfiguration {
@Override
- public void collectRow(Row row) throws IOException {}
+ public void check() throws PipeException {}
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index 74ddf9e47d..f5d8d2fbcd 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,4 +20,8 @@
package org.apache.iotdb.pipe.api.event;
/** This interface is used to abstract events in collaboration tasks. */
-public interface Event {}
+public interface Event {
+
+ /** @return the type of the event */
+ EventType getType();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
similarity index 87%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
rename to pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
index e1c50a9601..20be75367d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java
@@ -17,6 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.collector;
+package org.apache.iotdb.pipe.api.event;
-public class PipeCollectorEventSelector {}
+public enum EventType {
+ TABLET_INSERTION,
+ TSFILE_INSERTION,
+ DELETION,
+}
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
similarity index 87%
rename from
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
rename to
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
index 8a32339cf4..d1fb966379 100644
---
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/deletion/DeletionEvent.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.event.deletion;
+package org.apache.iotdb.pipe.api.event.dml.deletion;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -39,4 +40,9 @@ public interface DeletionEvent extends Event {
* @return TimeRange
*/
TimeRange getTimeRange();
+
+ @Override
+ default EventType getType() {
+ return EventType.DELETION;
+ }
}
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
similarity index 91%
rename from
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
rename to
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 5880f0fa71..e353bf397e 100644
---
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TabletInsertionEvent.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.event.insertion;
+package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Iterator;
@@ -53,4 +54,9 @@ public interface TabletInsertionEvent extends Event {
* RowCollector
*/
TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer);
+
+ @Override
+ default EventType getType() {
+ return EventType.TABLET_INSERTION;
+ }
}
diff --git
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
similarity index 89%
rename from
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
rename to
pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 7f324b8de6..2d5badc807 100644
---
a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/insertion/TsFileInsertionEvent.java
+++
b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.pipe.api.event.insertion;
+package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event
data stores in disks,
@@ -41,4 +42,9 @@ public interface TsFileInsertionEvent extends Event {
* @return TsFileInsertionEvent
*/
TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent>
iterable);
+
+ @Override
+ default EventType getType() {
+ return EventType.TSFILE_INSERTION;
+ }
}
diff --git a/pom.xml b/pom.xml
index d77888658f..a4e1f696de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
<thrift.version>0.14.1</thrift.version>
<airline.version>0.8</airline.version>
<jackson.version>2.13.5</jackson.version>
+ <disrupter.version>3.4.2</disrupter.version>
<jackson.databind.version>2.13.4.2</jackson.databind.version>
<antlr4.version>4.8-1</antlr4.version>
<common.cli.version>1.3.1</common.cli.version>
@@ -283,6 +284,11 @@
<artifactId>guava</artifactId>
<version>[${guava.version},)</version>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disrupter.version}</version>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index 187f94872f..fbda65880f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -212,7 +212,6 @@
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -222,7 +221,10 @@
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
</dependency>
</dependencies>
<build>
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 395c372ac3..f4f33c1047 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
@@ -278,6 +279,11 @@ public class TsFileProcessor {
workMemTable.insert(insertRowNode);
}
+ // collect plan node in pipe
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode,
tsFileResource);
+
// update start time of this memtable
tsFileResource.updateStartTime(
insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -379,13 +385,17 @@ public class TsFileProcessor {
}
throw new WriteProcessException(e);
}
-
for (int i = start; i < end; i++) {
results[i] = RpcUtils.SUCCESS_STATUS;
}
+
+ // collect plan node in pipe
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionInfo.getDataRegion().getDataRegionId(),
insertTabletNode, tsFileResource);
+
tsFileResource.updateStartTime(
insertTabletNode.getDeviceID().toStringID(),
insertTabletNode.getTimes()[start]);
-
// for sequence tsfile, we update the endTime only when the file is
prepared to be closed.
// for unsequence tsfile, we have to update the endTime for each insertion.
if (!sequence) {
@@ -843,6 +853,9 @@ public class TsFileProcessor {
.getOrCreateSyncManager(dataRegionInfo.getDataRegion().getDataRegionId())) {
syncManager.syncRealTimeTsFile(tsFileResource.getTsFile());
}
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource);
+
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
addAMemtableIntoFlushingList(tmpMemTable);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
new file mode 100644
index 0000000000..49a34d03e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config;
+
+// TODO: make these parameters configurable
+// TODO: make all pipe related parameters in one place
+// TODO: set the default value of the parameters in IoTDBDescriptor
+// TODO: move it to common module?
+public class PipeConfig {
+
+ private final int defaultRingBufferSize = 65536;
+
+ private final int matcherCacheSize = 1024;
+
+ private final int realtimeCollectorPendingQueueCapacity = 65536;
+
+ // this should be less than or equals to
realtimeCollectorPendingQueueCapacity
+ private final int realtimeCollectorPendingQueueTabletLimit =
+ realtimeCollectorPendingQueueCapacity / 2;
+
+ public int getDefaultRingBufferSize() {
+ return defaultRingBufferSize;
+ }
+
+ public int getMatcherCacheSize() {
+ return matcherCacheSize;
+ }
+
+ public int getRealtimeCollectorPendingQueueCapacity() {
+ return realtimeCollectorPendingQueueCapacity;
+ }
+
+ public int getRealtimeCollectorPendingQueueTabletLimit() {
+ return realtimeCollectorPendingQueueTabletLimit;
+ }
+
+ /////////////////////////////// Singleton ///////////////////////////////
+
+ private PipeConfig() {}
+
+ public static PipeConfig getInstance() {
+ return PipeConfigHolder.INSTANCE;
+ }
+
+ private static class PipeConfigHolder {
+ private static final PipeConfig INSTANCE = new PipeConfig();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
deleted file mode 100644
index c3bfb68015..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector;
-
-public class PipeCollectorEventPendingQueue {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
new file mode 100644
index 0000000000..452831c6d6
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public abstract class PipeRealtimeDataRegionCollector implements PipeCollector
{
+
+ protected final String pattern;
+ protected final String dataRegionId;
+
+ public PipeRealtimeDataRegionCollector(String pattern, String dataRegionId) {
+ this.pattern = pattern;
+ this.dataRegionId = dataRegionId;
+ }
+
+ @Override
+ public final void validate(PipeParameterValidator validator) throws
PipeException {
+ // TODO: complete this method
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
+ // TODO: complete this method
+ }
+
+ @Override
+ public void start() {
+ // TODO: if the collector is not started, start it. if the collector is
started, do nothing.
+
PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId,
this);
+ }
+
+ @Override
+ public void close() {
+ // TODO: if the collector is not closed, close it. if the collector is
closed, do nothing.
+
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId,
this);
+ }
+
+ /** @param event the event from the storage engine */
+ public abstract void collect(PipeRealtimeCollectEvent event);
+
+ public final String getPattern() {
+ return pattern;
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRealtimeDataRegionCollector{"
+ + "pattern='"
+ + pattern
+ + '\''
+ + ", dataRegionId='"
+ + dataRegionId
+ + '\''
+ + '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
new file mode 100644
index 0000000000..76aeebc530
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+// TODO: make this collector as a builtin pipe plugin. register it in
BuiltinPipePlugin.
+public class PipeRealtimeHybridDataRegionCollector extends
PipeRealtimeDataRegionCollector {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeRealtimeHybridDataRegionCollector.class);
+
+ // TODO: memory control
+ // This queue is used to store pending events collected by the method
collect(). The method
+ // supply() will poll events from this queue and send them to the next pipe
plugin.
+ private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
+
+ public PipeRealtimeHybridDataRegionCollector(String pattern, String
dataRegionId) {
+ super(pattern, dataRegionId);
+ this.pendingQueue =
+ new ArrayBlockingQueue<>(
+
PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+ }
+
+ @Override
+ public void collect(PipeRealtimeCollectEvent event) {
+ switch (event.getEvent().getType()) {
+ case TABLET_INSERTION:
+ collectTabletInsertion(event);
+ break;
+ case TSFILE_INSERTION:
+ collectTsFileInsertion(event);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported event type %s for Hybrid Realtime Collector %s",
+ event.getEvent().getType(), this));
+ }
+ }
+
+ private void collectTabletInsertion(PipeRealtimeCollectEvent event) {
+ if (isApproachingCapacity()) {
+ event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
+ // if the pending queue is approaching capacity, we should not collect
any more tablet events.
+ // all the data represented by the tablet events should be carried by
the following tsfile
+ // event.
+ return;
+ }
+
+ if
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE))
{
+ pendingQueue.offer(event);
+ }
+ }
+
+ private void collectTsFileInsertion(PipeRealtimeCollectEvent event) {
+ event
+ .getTsFileEpoch()
+ .migrateState(
+ this,
+ state ->
+ state.equals(TsFileEpoch.State.EMPTY) ?
TsFileEpoch.State.USING_TSFILE : state);
+
+ if (!pendingQueue.offer(event)) {
+ LOGGER.warn(
+ String.format(
+ "Pending Queue of Hybrid Realtime Collector %s has reached
capacity, discard TsFile Event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this)));
+ // TODO: more degradation strategies
+ // TODO: dynamic control of the pending queue capacity
+ // TODO: should be handled by the PipeRuntimeAgent
+ }
+ }
+
+ private boolean isApproachingCapacity() {
+ return pendingQueue.size()
+ >=
PipeConfig.getInstance().getRealtimeCollectorPendingQueueTabletLimit();
+ }
+
+ @Override
+ public Event supply() {
+ PipeRealtimeCollectEvent collectEvent = pendingQueue.poll();
+
+ while (collectEvent != null) {
+ Event suppliedEvent;
+ switch (collectEvent.getEvent().getType()) {
+ case TABLET_INSERTION:
+ suppliedEvent = supplyTabletInsertion(collectEvent);
+ break;
+ case TSFILE_INSERTION:
+ suppliedEvent = supplyTsFileInsertion(collectEvent);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported event type %s for Hybrid Realtime Collector %s",
+ collectEvent.getEvent().getType(), this));
+ }
+ if (suppliedEvent != null) {
+ return suppliedEvent;
+ }
+
+ collectEvent = pendingQueue.poll();
+ }
+
+ // means the pending queue is empty.
+ return null;
+ }
+
+ private Event supplyTabletInsertion(PipeRealtimeCollectEvent event) {
+ event
+ .getTsFileEpoch()
+ .migrateState(
+ this,
+ state ->
+ (state.equals(TsFileEpoch.State.EMPTY)) ?
TsFileEpoch.State.USING_TABLET : state);
+
+ if
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
+ return event.getEvent();
+ }
+ // if the state is USING_TSFILE, discard the event and poll the next one.
+ return null;
+ }
+
+ private Event supplyTsFileInsertion(PipeRealtimeCollectEvent event) {
+ event
+ .getTsFileEpoch()
+ .migrateState(
+ this,
+ state -> {
+ // this would not happen, but just in case.
+ if (state.equals(TsFileEpoch.State.EMPTY)) {
+ LOGGER.warn(
+ String.format("EMPTY TsFileEpoch when supplying TsFile
Event %s", event));
+ return TsFileEpoch.State.USING_TSFILE;
+ }
+ return state;
+ });
+
+ if
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
+ return event.getEvent();
+ }
+ // if the state is USING_TABLET, discard the event and poll the next one.
+ return null;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
new file mode 100644
index 0000000000..163b450021
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+
+public class DisruptorQueue<E> {
+ private Disruptor<Container<E>> disruptor;
+ private RingBuffer<Container<E>> ringBuffer;
+
+ private DisruptorQueue() {}
+
+ public void publish(E obj) {
+ ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o),
obj);
+ }
+
+ public void clear() {
+ disruptor.halt();
+ }
+
+ public static class Builder<E> {
+ private int ringBufferSize =
PipeConfig.getInstance().getDefaultRingBufferSize();
+ private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
+ private ProducerType producerType = ProducerType.MULTI;
+ private WaitStrategy waitStrategy = new BlockingWaitStrategy();
+ private final List<EventHandler<E>> handlers = new ArrayList<>();
+
+ public Builder<E> setRingBufferSize(int ringBufferSize) {
+ this.ringBufferSize = ringBufferSize;
+ return this;
+ }
+
+ public Builder<E> setThreadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ return this;
+ }
+
+ public Builder<E> setProducerType(ProducerType producerType) {
+ this.producerType = producerType;
+ return this;
+ }
+
+ public Builder<E> setWaitStrategy(WaitStrategy waitStrategy) {
+ this.waitStrategy = waitStrategy;
+ return this;
+ }
+
+ public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
+ this.handlers.add(eventHandler);
+ return this;
+ }
+
+ public DisruptorQueue<E> build() {
+ DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
+ disruptorQueue.disruptor =
+ new Disruptor<>(
+ Container::new, ringBufferSize, threadFactory, producerType,
waitStrategy);
+ for (EventHandler<E> handler : handlers) {
+ disruptorQueue.disruptor.handleEventsWith(
+ (container, sequence, endOfBatch) ->
+ handler.onEvent(container.getObj(), sequence, endOfBatch));
+ }
+ disruptorQueue.disruptor.start();
+ disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
+ return disruptorQueue;
+ }
+ }
+
+ private static class Container<E> {
+ private E obj;
+
+ private Container() {}
+
+ public E getObj() {
+ return obj;
+ }
+
+ public void setObj(E obj) {
+ this.obj = obj;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
new file mode 100644
index 0000000000..38ee9257be
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
+
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+
+import com.lmax.disruptor.dsl.ProducerType;
+
+public class PipeDataRegionAssigner {
+
+ /** The matcher is used to match the event with the collector based on the
pattern. */
+ private final PipeDataRegionMatcher matcher;
+
+ /** The disruptor is used to assign the event to the collector. */
+ private final DisruptorQueue<PipeRealtimeCollectEvent> disruptor;
+
+ public PipeDataRegionAssigner() {
+ this.matcher = new CachedSchemaPatternMatcher();
+ this.disruptor =
+ new DisruptorQueue.Builder<PipeRealtimeCollectEvent>()
+ .setProducerType(ProducerType.SINGLE)
+ .addEventHandler(
+ (event, sequence, endOfBatch) -> {
+ matcher.match(event);
+ event.gcSchemaInfo();
+ })
+ .build();
+ }
+
+ public void publishToAssign(PipeRealtimeCollectEvent event) {
+ disruptor.publish(event);
+ }
+
+ public void startAssignTo(PipeRealtimeDataRegionCollector collector) {
+ matcher.register(collector);
+ }
+
+ public void stopAssignTo(PipeRealtimeDataRegionCollector collector) {
+ matcher.deregister(collector);
+ }
+
+ public boolean notMoreCollectorNeededToBeAssigned() {
+ return matcher.getRegisterCount() == 0;
+ }
+
+ /**
+ * Clear the matcher and disruptor. The method publishToAssign should be
work after calling this
+ * method.
+ */
+ public void gc() {
+ matcher.clear();
+ disruptor.clear();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
deleted file mode 100644
index 7525e06b54..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
-
-public class PipeRealtimeEventCache {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
deleted file mode 100644
index b91334430d..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class IoTLogListerner {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
new file mode 100644
index 0000000000..bfa764c371
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
+import
org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * PipeInsertionEventListener is a singleton in each data node.
+ *
+ * <p>It is used to listen to events from storage engine and publish them to
pipe engine.
+ *
+ * <p>2 kinds of events are collected: 1. level-0 tsfile sealed event 2.
insertion operation event
+ *
+ * <p>All events collected by this listener will be first published to
different
+ * PipeEventDataRegionAssigners (identified by data region id), and then
PipeEventDataRegionAssigner
+ * will filter events and assign them to different
PipeRealtimeEventDataRegionCollectors.
+ */
+public class PipeInsertionDataNodeListener {
+
+ private final ConcurrentMap<String, PipeDataRegionAssigner>
dataRegionId2Assigner =
+ new ConcurrentHashMap<>();
+
+ public synchronized void startListenAndAssign(
+ String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+ dataRegionId2Assigner
+ .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
+ .startAssignTo(collector);
+ }
+
+ public synchronized void stopListenAndAssign(
+ String dataRegionId, PipeRealtimeDataRegionCollector collector) {
+ final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
+ if (assigner == null) {
+ return;
+ }
+
+ assigner.stopAssignTo(collector);
+
+ if (assigner.notMoreCollectorNeededToBeAssigned()) {
+ // the removed assigner will is the same as the one referenced by the
variable `assigner`
+ dataRegionId2Assigner.remove(dataRegionId);
+ // this will help to release the memory occupied by the assigner
+ assigner.gc();
+ }
+ }
+
+ // TODO: listen to the tsfile synced from the other cluster
+ // TODO: check whether the method is called on the right place. what is the
meaning of the
+ // variable shouldClose before calling this method?
+ // TODO: maximum the efficiency of the method when there is no pipe in the
system, avoid
+ // dataRegionId2Assigner.get(dataRegionId);
+ public void listenToTsFile(String dataRegionId, TsFileResource
tsFileResource) {
+ final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
+
+ // only events from registered data region will be collected
+ if (assigner == null) {
+ return;
+ }
+
+ assigner.publishToAssign(
+ PipeRealtimeCollectEventFactory.createCollectEvent(
+ tsFileResource.getTsFile(), tsFileResource));
+ }
+
+ // TODO: check whether the method is called on the right place.
+ public void listenToInsertNode(
+ String dataRegionId, InsertNode insertNode, TsFileResource
tsFileResource) {
+ final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(dataRegionId);
+
+ // only events from registered data region will be collected
+ if (assigner == null) {
+ return;
+ }
+
+ assigner.publishToAssign(
+ PipeRealtimeCollectEventFactory.createCollectEvent(insertNode,
tsFileResource));
+ }
+
+ /////////////////////////////// singleton ///////////////////////////////
+
+ private PipeInsertionDataNodeListener() {}
+
+ public static PipeInsertionDataNodeListener getInstance() {
+ return PipeChangeDataCaptureListenerHolder.INSTANCE;
+ }
+
+ private static class PipeChangeDataCaptureListenerHolder {
+ private static final PipeInsertionDataNodeListener INSTANCE =
+ new PipeInsertionDataNodeListener();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
deleted file mode 100644
index 3aa42354f5..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class RatisLogListener {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
deleted file mode 100644
index b5e1eaf93b..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class SimpleLogListener {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
deleted file mode 100644
index 4c28795219..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-
-public class TsFileGenerationListener {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
new file mode 100644
index 0000000000..0589cf09d7
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
+
+ private final ReentrantReadWriteLock lock;
+
+ private final Set<PipeRealtimeDataRegionCollector> collectors;
+ private final Cache<String, Set<PipeRealtimeDataRegionCollector>>
deviceToCollectorsCache;
+
+ public CachedSchemaPatternMatcher() {
+ this.lock = new ReentrantReadWriteLock();
+ this.collectors = new HashSet<>();
+ this.deviceToCollectorsCache =
+
Caffeine.newBuilder().maximumSize(PipeConfig.getInstance().getMatcherCacheSize()).build();
+ }
+
+ @Override
+ public void register(PipeRealtimeDataRegionCollector collector) {
+ lock.writeLock().lock();
+ try {
+ collectors.add(collector);
+ deviceToCollectorsCache.invalidateAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void deregister(PipeRealtimeDataRegionCollector collector) {
+ lock.writeLock().lock();
+ try {
+ collectors.remove(collector);
+ deviceToCollectorsCache.invalidateAll();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public int getRegisterCount() {
+ lock.readLock().lock();
+ try {
+ return collectors.size();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // TODO: maximum the efficiency of matching when pattern is root
+ // TODO: memory control
+ @Override
+ public void match(PipeRealtimeCollectEvent event) {
+ final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new
HashSet<>();
+
+ lock.readLock().lock();
+ try {
+ if (collectors.isEmpty()) {
+ return;
+ }
+
+ for (final Map.Entry<String, String[]> entry :
event.getSchemaInfo().entrySet()) {
+ final String device = entry.getKey();
+ final String[] measurements = entry.getValue();
+
+ // 1. try to get matched collectors from cache, if not success, match
them by device
+ final Set<PipeRealtimeDataRegionCollector> collectorsFilteredByDevice =
+ deviceToCollectorsCache.get(device,
this::filterCollectorsByDevice);
+ // this would not happen
+ if (collectorsFilteredByDevice == null) {
+ LOGGER.warn(String.format("Match result NPE when handle device %s",
device));
+ continue;
+ }
+
+ // 2. filter matched candidate collectors by measurements
+ if (measurements.length == 0) {
+ // `measurements` is empty (only in case of tsfile event). match all
collectors.
+ //
+ // case 1: for example, pattern is root.a.b, device is root.a.b.c,
measurement can be any.
+ // in this case, the collector can be matched without checking the
measurements.
+ //
+ // case 2: for example, pattern is root.a.b.c, device is root.a.b.
+ // in this situation, the collector can not be matched in some
cases, but we can not know
+ // all the measurements of the device in an efficient way, so we
ASSUME that the collector
+ // can be matched. this is a trade-off between efficiency and
accuracy. for most user's
+ // usage, this is acceptable, which may result in some unnecessary
data processing and
+ // transmission, but will not result in data loss.
+ matchedCollectors.addAll(collectorsFilteredByDevice);
+ } else {
+ // `measurements` is not empty (only in case of tablet event). match
collectors by
+ // measurements.
+ collectorsFilteredByDevice.forEach(
+ collector -> {
+ final String pattern = collector.getPattern();
+
+ // case 1: for example, pattern is root.a.b and device is
root.a.b.c
+ // in this case, the collector can be matched without checking
the measurements
+ if (pattern.length() <= device.length()) {
+ matchedCollectors.add(collector);
+ }
+ // case 2: for example, pattern is root.a.b.c and device is
root.a.b
+ // in this case, we need to check the full path
+ else {
+ for (String measurement : measurements) {
+ // for example, pattern is root.a.b.c, device is root.a.b
and measurement is c
+ // in this case, the collector can be matched. other cases
are not matched.
+ // please note that there should be a . between device and
measurement.
+ if (
+ // low cost check comes first
+ pattern.length() == device.length() + measurement.length()
+ 1
+ // high cost check comes later
+ && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
+ matchedCollectors.add(collector);
+ // there would be no more matched collectors because the
measurements are
+ // unique
+ break;
+ }
+ }
+ }
+ });
+ }
+
+ if (matchedCollectors.size() == collectors.size()) {
+ break;
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ matchedCollectors.forEach(collector -> collector.collect(event));
+ }
+
+ private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String
device) {
+ final Set<PipeRealtimeDataRegionCollector> filteredCollectors = new
HashSet<>();
+
+ for (PipeRealtimeDataRegionCollector collector : collectors) {
+ String pattern = collector.getPattern();
+ if (
+ // for example, pattern is root.a.b and device is root.a.b.c
+ // in this case, the collector can be matched without checking the
measurements
+ (pattern.length() <= device.length() && device.startsWith(pattern))
+ // for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, the collector can be selected as candidate, but the
measurements should
+ // be checked further
+ || (pattern.length() > device.length() &&
pattern.startsWith(device))) {
+ filteredCollectors.add(collector);
+ }
+ }
+
+ return filteredCollectors;
+ }
+
+ @Override
+ public void clear() {
+ lock.writeLock().lock();
+ try {
+ collectors.clear();
+ deviceToCollectorsCache.invalidateAll();
+ deviceToCollectorsCache.cleanUp();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
new file mode 100644
index 0000000000..dcdbffa71e
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+
+public interface PipeDataRegionMatcher {
+
+ /**
+ * Register a collector. If the collector's pattern matches the event's
schema info, the event
+ * will be assigned to the collector.
+ */
+ void register(PipeRealtimeDataRegionCollector collector);
+
+ /** Deregister a collector. */
+ void deregister(PipeRealtimeDataRegionCollector collector);
+
+ /** Get the number of registered collectors in this matcher. */
+ int getRegisterCount();
+
+ /**
+ * Match the event's schema info with the registered collectors' patterns.
If the event's schema
+ * info matches the pattern of a collector, the event will be assigned to
the collector.
+ */
+ void match(PipeRealtimeCollectEvent event);
+
+ /** Clear all the registered collectors and internal data structures. */
+ void clear();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
deleted file mode 100644
index 948fab2acd..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-
-public class Rule {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
deleted file mode 100644
index a543d7f0ef..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-
-public class RulePrefixMatchTree {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
deleted file mode 100644
index a85d3a0b70..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-
-public class TsFileEpoch {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
deleted file mode 100644
index 6948cef200..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-
-public class TsFileEpochRecorder {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
similarity index 68%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index a58e8db497..3afeabf9ea 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Iterator;
@@ -29,18 +30,29 @@ import java.util.function.BiConsumer;
public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+ private final InsertNode insertNode;
+
+ public PipeTabletInsertionEvent(InsertNode insertNode) {
+ this.insertNode = insertNode;
+ }
+
@Override
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector>
consumer) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>,
RowCollector> consumer) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector>
consumer) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public String toString() {
+ return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
similarity index 65%
copy from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
copy to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index d11b4f780d..7282fb77bc 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -17,20 +17,33 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.core.event.impl;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.io.File;
public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
+ private final File tsFile;
+
+ public PipeTsFileInsertionEvent(File tsFile) {
+ this.tsFile = tsFile;
+ }
+
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public TsFileInsertionEvent
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public String toString() {
+ return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
deleted file mode 100644
index 22cbce9e04..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public interface PipeEventIndexer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
deleted file mode 100644
index 63aff8a519..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeIoTEventIndexer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
deleted file mode 100644
index 4520855085..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeRatisEventIndexer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
deleted file mode 100644
index d5add45547..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeSimpleEventIndexer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
deleted file mode 100644
index be53e89751..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.indexer;
-
-public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
new file mode 100644
index 0000000000..4041e74918
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
+
+import java.util.Map;
+
+public class PipeRealtimeCollectEvent implements Event {
+
+ private final Event event;
+ private final TsFileEpoch tsFileEpoch;
+
+ private Map<String, String[]> device2Measurements;
+
+ public PipeRealtimeCollectEvent(
+ Event event, TsFileEpoch tsFileEpoch, Map<String, String[]>
device2Measurements) {
+ this.event = event;
+ this.tsFileEpoch = tsFileEpoch;
+ this.device2Measurements = device2Measurements;
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public TsFileEpoch getTsFileEpoch() {
+ return tsFileEpoch;
+ }
+
+ public Map<String, String[]> getSchemaInfo() {
+ return device2Measurements;
+ }
+
+ public void gcSchemaInfo() {
+ device2Measurements = null;
+ }
+
+ @Override
+ public EventType getType() {
+ return event.getType();
+ }
+
+ @Override
+ public String toString() {
+ return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" +
tsFileEpoch + '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
new file mode 100644
index 0000000000..2bb1b3174f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+
+import java.io.File;
+
+public class PipeRealtimeCollectEventFactory {
+
+ private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
+
+ // TODO: resource control here?
+ public static PipeRealtimeCollectEvent createCollectEvent(File tsFile,
TsFileResource resource) {
+ return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
+ new PipeTsFileInsertionEvent(tsFile), resource);
+ }
+
+ // TODO: resource control here?
+ public static PipeRealtimeCollectEvent createCollectEvent(
+ InsertNode node, TsFileResource resource) {
+ return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
+ new PipeTabletInsertionEvent(node), node, resource);
+ }
+
+ private PipeRealtimeCollectEventFactory() {
+ // factory class, do not instantiate
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
new file mode 100644
index 0000000000..b6c71f45d4
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TsFileEpoch {
+
+ private final String filePath;
+ private final ConcurrentMap<PipeRealtimeDataRegionCollector,
AtomicReference<State>>
+ dataRegionCollector2State;
+
+ public TsFileEpoch(String filePath) {
+ this.filePath = filePath;
+ this.dataRegionCollector2State = new ConcurrentHashMap<>();
+ }
+
+ public TsFileEpoch.State getState(PipeRealtimeDataRegionCollector collector)
{
+ return dataRegionCollector2State
+ .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+ .get();
+ }
+
+ public void migrateState(
+ PipeRealtimeDataRegionCollector collector, TsFileEpochStateMigrator
visitor) {
+ dataRegionCollector2State
+ .computeIfAbsent(collector, o -> new AtomicReference<>(State.EMPTY))
+ .getAndUpdate(visitor::migrate);
+ }
+
+ @Override
+ public String toString() {
+ return "TsFileEpoch{"
+ + "filePath='"
+ + filePath
+ + '\''
+ + ", dataRegionCollector2State="
+ + dataRegionCollector2State
+ + '}';
+ }
+
+ public enum State {
+ EMPTY,
+ USING_TABLET,
+ USING_TSFILE
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
new file mode 100644
index 0000000000..b786977864
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.realtime;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TsFileEpochManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileEpochManager.class);
+
+ private static final String[] EMPTY_MEASUREMENT_ARRAY = new String[0];
+
+ private final Map<String, TsFileEpoch> filePath2Epoch = new HashMap<>();
+
+ public PipeRealtimeCollectEvent bindPipeTsFileInsertionEvent(
+ PipeTsFileInsertionEvent event, TsFileResource resource) {
+ final String filePath = resource.getTsFilePath();
+
+ // this would not happen, but just in case
+ if (!filePath2Epoch.containsKey(filePath)) {
+ LOGGER.warn(
+ String.format("PipeEngine: can not find TsFileEpoch for TsFile %s,
create it", filePath));
+ filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
+ }
+
+ return new PipeRealtimeCollectEvent(
+ event,
+ // TODO: we have to make sure that the TsFileInsertionEvent is the
last event of the
+ // TsFileEpoch's life cycle
+ filePath2Epoch.remove(filePath),
+ resource.getDevices().stream()
+ .collect(Collectors.toMap(device -> device, device ->
EMPTY_MEASUREMENT_ARRAY)));
+ }
+
+ public PipeRealtimeCollectEvent bindPipeTabletInsertionEvent(
+ PipeTabletInsertionEvent event, InsertNode node, TsFileResource
resource) {
+ return new PipeRealtimeCollectEvent(
+ event,
+ filePath2Epoch.computeIfAbsent(resource.getTsFilePath(),
TsFileEpoch::new),
+ Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()));
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
similarity index 81%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
index b5fad778b0..f75f7c87e5 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochStateMigrator.java
@@ -17,6 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.collector.realtime;
+package org.apache.iotdb.db.pipe.core.event.realtime;
-public class PipeRealtimeCollector {}
+@FunctionalInterface
+public interface TsFileEpochStateMigrator {
+ TsFileEpoch.State migrate(final TsFileEpoch.State state);
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
similarity index 97%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 43b438445f..6c9a5c55f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.access;
+package org.apache.iotdb.db.pipe.core.event.view.access;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
similarity index 96%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
index 960214bea6..73ee4d041a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.access;
+package org.apache.iotdb.db.pipe.core.event.view.access;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.access.RowIterator;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
similarity index 82%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 2c2bbfd38f..7cc0778193 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -17,12 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.db.pipe.core.event.view.collector;
import org.apache.iotdb.pipe.api.collector.EventCollector;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.IOException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
similarity index 94%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 525e79c137..687c3e72c1 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event.collector;
+package org.apache.iotdb.db.pipe.core.event.view.collector;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
similarity index 58%
rename from
server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
rename to
server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index d11b4f780d..fc9ed29eaf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -17,20 +17,29 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.event;
+package org.apache.iotdb.db.pipe.resource;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import java.io.File;
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
+public class PipeResourceManager {
- @Override
- public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
- return null;
+ public File addFileReference(File file) {
+ return file;
}
- @Override
- public TsFileInsertionEvent
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
- return null;
+ public File removeFileReference(File file) {
+ return file;
+ }
+
+ ///////////////////////////// SINGLETON /////////////////////////////
+
+ private PipeResourceManager() {}
+
+ public static PipeResourceManager getInstance() {
+ return PipeResourceManagerHolder.INSTANCE;
+ }
+
+ private static class PipeResourceManagerHolder {
+ private static final PipeResourceManager INSTANCE = new
PipeResourceManager();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index e6764cf699..b420d9e51a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index aa8772f2a1..34f8045cb6 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
new file mode 100644
index 0000000000..4a9abbef61
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class CachedSchemaPatternMatcherTest {
+
+ private CachedSchemaPatternMatcher matcher;
+ private ExecutorService executorService;
+ private List<PipeRealtimeDataRegionCollector> collectorList;
+
+ @Before
+ public void setUp() {
+ matcher = new CachedSchemaPatternMatcher();
+ executorService = Executors.newSingleThreadExecutor();
+ collectorList = new ArrayList<>();
+ }
+
+ @After
+ public void tearDown() {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testCachedMatcher() throws ExecutionException,
InterruptedException {
+ PipeRealtimeDataRegionCollector databaseCollector =
+ new PipeRealtimeFakeDataRegionCollector("root", "1");
+ collectorList.add(databaseCollector);
+
+ int deviceCollectorNum = 10;
+ int seriesCollectorNum = 10;
+ for (int i = 0; i < deviceCollectorNum; i++) {
+ PipeRealtimeDataRegionCollector deviceCollector =
+ new PipeRealtimeFakeDataRegionCollector("root." + i, "1");
+ collectorList.add(deviceCollector);
+ for (int j = 0; j < seriesCollectorNum; j++) {
+ PipeRealtimeDataRegionCollector seriesCollector =
+ new PipeRealtimeFakeDataRegionCollector("root." + i + "." + j,
"1");
+ collectorList.add(seriesCollector);
+ }
+ }
+
+ Future<?> future =
+ executorService.submit(
+ () -> collectorList.forEach(collector ->
matcher.register(collector)));
+
+ int epochNum = 10000;
+ int deviceNum = 1000;
+ int seriesNum = 100;
+ Map<String, String[]> deviceMap =
+ IntStream.range(0, deviceNum)
+ .mapToObj(String::valueOf)
+ .collect(Collectors.toMap(s -> "root." + s, s -> new String[0]));
+ String[] measurements =
+ IntStream.range(0,
seriesNum).mapToObj(String::valueOf).toArray(String[]::new);
+ long totalTime = 0;
+ for (int i = 0; i < epochNum; i++) {
+ for (int j = 0; j < deviceNum; j++) {
+ PipeRealtimeCollectEvent event =
+ new PipeRealtimeCollectEvent(
+ null, null, Collections.singletonMap("root." + i,
measurements));
+ long startTime = System.currentTimeMillis();
+ matcher.match(event);
+ totalTime += (System.currentTimeMillis() - startTime);
+ }
+ PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null,
null, deviceMap);
+ long startTime = System.currentTimeMillis();
+ matcher.match(event);
+ totalTime += (System.currentTimeMillis() - startTime);
+ }
+ System.out.println("matcher.getRegisterCount() = " +
matcher.getRegisterCount());
+ System.out.println("totalTime = " + totalTime);
+ System.out.println(
+ "device match per second = "
+ + ((double) (epochNum * (deviceNum + 1)) / (double) (totalTime) *
1000.0));
+
+ future.get();
+ }
+
+ public static class PipeRealtimeFakeDataRegionCollector extends
PipeRealtimeDataRegionCollector {
+
+ public PipeRealtimeFakeDataRegionCollector(String pattern, String
dataRegionId) {
+ super(pattern, dataRegionId);
+ }
+
+ @Override
+ public Event supply() {
+ return null;
+ }
+
+ @Override
+ public void collect(PipeRealtimeCollectEvent event) {
+ final boolean[] match = {false};
+ event
+ .getSchemaInfo()
+ .forEach(
+ (k, v) -> {
+ if (v.length > 0) {
+ for (String s : v) {
+ match[0] =
+ match[0]
+ || (k + TsFileConstant.PATH_SEPARATOR +
s).startsWith(getPattern());
+ }
+ } else {
+ match[0] = match[0] || (getPattern().startsWith(k) ||
k.startsWith(getPattern()));
+ }
+ });
+ Assert.assertTrue(match[0]);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
new file mode 100644
index 0000000000..3d395456d2
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeHybridDataRegionCollector;
+import
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+public class PipeRealtimeCollectTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
+
+ private final String dataRegion1 = "dataRegion-1";
+ private final String dataRegion2 = "dataRegion-2";
+ private final String pattern1 = "root.sg.d";
+ private final String pattern2 = "root.sg.d.a";
+ private final String[] device = new String[] {"root", "sg", "d"};
+ private final AtomicBoolean alive = new AtomicBoolean();
+
+ private ExecutorService writeService;
+ private ExecutorService listenerService;
+
+ @Before
+ public void setUp() {
+ writeService = Executors.newFixedThreadPool(2);
+ listenerService = Executors.newFixedThreadPool(4);
+ }
+
+ @After
+ public void tearDown() {
+ writeService.shutdownNow();
+ listenerService.shutdownNow();
+ }
+
+ @Test
+ public void testRealtimeCollectProcess() throws ExecutionException,
InterruptedException {
+ // set up realtime collector
+
+ try (PipeRealtimeHybridDataRegionCollector collector1 =
+ new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion1);
+ PipeRealtimeHybridDataRegionCollector collector2 =
+ new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion1);
+ PipeRealtimeHybridDataRegionCollector collector3 =
+ new PipeRealtimeHybridDataRegionCollector(pattern1, dataRegion2);
+ PipeRealtimeHybridDataRegionCollector collector4 =
+ new PipeRealtimeHybridDataRegionCollector(pattern2, dataRegion2)) {
+
+ PipeRealtimeDataRegionCollector[] collectors =
+ new PipeRealtimeDataRegionCollector[] {collector1, collector2,
collector3, collector4};
+
+ // start collector 0, 1
+ collectors[0].start();
+ collectors[1].start();
+
+ // test result of collector 0, 1
+ int writeNum = 10;
+ List<Future<?>> writeFutures =
+ Arrays.asList(
+ write2DataRegion(writeNum, dataRegion1),
write2DataRegion(writeNum, dataRegion2));
+
+ alive.set(true);
+ List<Future<?>> listenFutures =
+ Arrays.asList(
+ listen(
+ collectors[0],
+ type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ writeNum << 1),
+ listen(collectors[1], typ2 -> 1, writeNum));
+
+ try {
+ listenFutures.get(0).get(10, TimeUnit.MINUTES);
+ listenFutures.get(1).get(10, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ LOGGER.warn("Time out when listening collector", e);
+ alive.set(false);
+ Assert.fail();
+ }
+ writeFutures.forEach(
+ future -> {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // start collector 2, 3
+ collectors[2].start();
+ collectors[3].start();
+
+ // test result of collector 0 - 3
+ writeFutures =
+ Arrays.asList(
+ write2DataRegion(writeNum, dataRegion1),
write2DataRegion(writeNum, dataRegion2));
+
+ alive.set(true);
+ listenFutures =
+ Arrays.asList(
+ listen(
+ collectors[0],
+ type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ writeNum << 1),
+ listen(collectors[1], typ2 -> 1, writeNum),
+ listen(
+ collectors[2],
+ type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ writeNum << 1),
+ listen(collectors[3], typ2 -> 1, writeNum));
+ try {
+ listenFutures.get(0).get(10, TimeUnit.MINUTES);
+ listenFutures.get(1).get(10, TimeUnit.MINUTES);
+ listenFutures.get(2).get(10, TimeUnit.MINUTES);
+ listenFutures.get(3).get(10, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ LOGGER.warn("Time out when listening collector", e);
+ alive.set(false);
+ Assert.fail();
+ }
+ writeFutures.forEach(
+ future -> {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ private Future<?> write2DataRegion(int writeNum, String dataRegionId) {
+ return writeService.submit(
+ () -> {
+ for (int i = 0; i < writeNum; ++i) {
+ TsFileResource resource =
+ new TsFileResource(new File(dataRegionId,
String.format("%s-%s-0-0.tsfile", i, i)));
+
resource.updateStartTime(String.join(TsFileConstant.PATH_SEPARATOR, device), 0);
+
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionId,
+ new InsertRowNode(
+ new PlanNodeId(String.valueOf(i)),
+ new PartialPath(device),
+ false,
+ new String[] {"a"},
+ null,
+ 0,
+ null,
+ false),
+ resource);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToInsertNode(
+ dataRegionId,
+ new InsertRowNode(
+ new PlanNodeId(String.valueOf(i)),
+ new PartialPath(device),
+ false,
+ new String[] {"b"},
+ null,
+ 0,
+ null,
+ false),
+ resource);
+
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId,
resource);
+ }
+ });
+ }
+
+ private Future<?> listen(
+ PipeRealtimeDataRegionCollector collector,
+ Function<EventType, Integer> weight,
+ int expectNum) {
+ return listenerService.submit(
+ () -> {
+ int eventNum = 0;
+ try {
+ while (alive.get() && eventNum < expectNum) {
+ Event event;
+ try {
+ event = collector.supply();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (event != null) {
+ eventNum += weight.apply(event.getType());
+ }
+ }
+ } finally {
+ Assert.assertEquals(expectNum, eventNum);
+ }
+ });
+ }
+}
diff --git a/server/src/test/resources/logback-test.xml
b/server/src/test/resources/logback-test.xml
index d70151b7dd..eeab97f16b 100644
--- a/server/src/test/resources/logback-test.xml
+++ b/server/src/test/resources/logback-test.xml
@@ -40,6 +40,7 @@
<!-- enable me if you want to monitor when files are opened and closed.
<logger name="FileMonitor" level="info"/>
-->
+ <logger name="org.apache.iotdb.db.pipe" level="INFO"/>
<logger name="org.apache.iotdb.db.sync" level="INFO"/>
<logger name="org.apache.iotdb.db.engine.merge" level="INFO"/>
<logger name="org.apache.iotdb.db.metadata" level="INFO"/>