This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa35d57cd0 Pipe: Implement down-sampling-processor plugin for 
reducing the amount of data transferred during data sync (#11557)
4fa35d57cd0 is described below

commit 4fa35d57cd04d5198e48d39e8790bf1c9fab92c9
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 28 18:11:06 2023 +0800

    Pipe: Implement down-sampling-processor plugin for reducing the amount of 
data transferred during data sync (#11557)
    
    This commit introduces down-sampling-processor plugin, it guarantees this 
function:
    
    1. Memory permitting, a tablet row and can only be transferred iff there 
exists measurement with timeStamp greater than last timeStamp recorded with the 
specified time interval.
    2. A file won't be parsed unless down-sampling.split-file is configured to 
true.
    
    The plugin's parameters as follow:
    
    | key     | value     | value range     | required or optional with default 
|
    | -------- | -------- | -------- | -------- |
    | processor | down-sampling-processor | down-sampling-processor |  required 
|
    | processor.down-sampling.interval-seconds | Sampling interval of specifed 
tablets | long | optional : 60 |
    | processor.down-sampling.split-file | Whether to split the file during 
processing | boolean | optional : false |
    | processor.down-sampling.memory-limit-in-bytes | Initial memory for the 
down-sampling-processor cache | long | optional : 16 * 1024 * 1024 |
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  12 +-
 .../iotdb/pipe/PipeEnvironmentException.java       |  12 +-
 .../apache/iotdb/pipe/it/AbstractPipeDualIT.java   |  60 ++++++
 .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java   |  65 +++----
 .../pipe/it/IoTDBPipeConnectorParallelIT.java      |  28 +--
 .../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java  |  29 +--
 .../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java |  28 +--
 .../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java |  29 +--
 ...orParallelIT.java => IoTDBPipeProcessorIT.java} |  68 +++----
 .../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java  |  27 ++-
 .../iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java    |  16 +-
 .../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java     |  29 +--
 .../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java    |  29 +--
 .../java/org/apache/iotdb/pipe/api/access/Row.java |  10 +-
 .../agent/plugin/PipeProcessorConstructor.java     |   3 +
 .../config/constant/PipeProcessorConstant.java     |  12 ++
 .../env/PipeTaskConnectorRuntimeEnvironment.java   |   9 +-
 .../env/PipeTaskExtractorRuntimeEnvironment.java   |   9 +-
 ...va => PipeTaskProcessorRuntimeEnvironment.java} |  13 +-
 .../plugin/env/PipeTaskRuntimeEnvironment.java     |   8 +-
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |   5 +
 .../downsampling/DownSamplingProcessor.java        | 213 +++++++++++++++++++++
 .../downsampling/PartialPathLastTimeCache.java     | 111 +++++++++++
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |  14 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   6 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |   8 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  47 +++--
 .../builtin/processor/DownSamplingProcessor.java}  |  17 +-
 .../builtin/processor/PlaceHolderProcessor.java    |  68 +++++++
 .../org/apache/iotdb/tsfile/utils/BytesUtils.java  |  10 +
 30 files changed, 653 insertions(+), 342 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index f7aac47b65f..3fa35d05f25 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -414,11 +414,13 @@ public class TestUtils {
   // This method will not throw failure given that a failure is encountered.
   // Instead, it return a flag to indicate the result of the execution.
   public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, 
List<String> sqlList) {
+    int lastIndex = 0;
     for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
       try (Connection connection = env.getConnection();
           Statement statement = connection.createStatement()) {
-        for (String sql : sqlList) {
-          statement.execute(sql);
+        for (int i = lastIndex; i < sqlList.size(); ++i) {
+          statement.execute(sqlList.get(i));
+          lastIndex = i;
         }
         return true;
       } catch (SQLException e) {
@@ -483,11 +485,13 @@ public class TestUtils {
 
   public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
       BaseEnv env, DataNodeWrapper wrapper, List<String> sqlList) {
+    int lastIndex = 0;
     for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
       try (Connection connection = 
env.getConnectionWithSpecifiedDataNode(wrapper);
           Statement statement = connection.createStatement()) {
-        for (String sql : sqlList) {
-          statement.execute(sql);
+        for (int i = lastIndex; i < sqlList.size(); ++i) {
+          statement.execute(sqlList.get(i));
+          lastIndex = i;
         }
         return true;
       } catch (SQLException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
similarity index 75%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
copy to 
integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
index 8caa87c03ec..5788e89167a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
@@ -17,13 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config.constant;
+package org.apache.iotdb.pipe;
 
-public class PipeProcessorConstant {
+public class PipeEnvironmentException extends RuntimeException {
 
-  public static final String PROCESSOR_KEY = "processor";
+  public PipeEnvironmentException(String message) {
+    super(message);
+  }
 
-  private PipeProcessorConstant() {
-    throw new IllegalStateException("Utility class");
+  public PipeEnvironmentException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
new file mode 100644
index 00000000000..62d87ac9f1f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
+
+import org.junit.After;
+import org.junit.Before;
+
+abstract class AbstractPipeDualIT {
+
+  protected BaseEnv senderEnv;
+  protected BaseEnv receiverEnv;
+
+  @Before
+  public void setUp() throws PipeEnvironmentException {
+    try {
+      MultiEnvFactory.createEnv(2);
+      senderEnv = MultiEnvFactory.getEnv(0);
+      receiverEnv = MultiEnvFactory.getEnv(1);
+
+      senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+      
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
+      senderEnv.initClusterEnvironment();
+      receiverEnv.initClusterEnvironment();
+    } catch (Exception | Error e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
+  }
+
+  @After
+  public void tearDown() throws PipeEnvironmentException {
+    try {
+      senderEnv.cleanClusterEnvironment();
+      receiverEnv.cleanClusterEnvironment();
+    } catch (Exception | Error e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 8ead12976c2..91a4c13d1f9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -35,11 +35,10 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -61,41 +60,37 @@ import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeClusterIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
+public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
 
+  @Override
   @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv
-        .getConfig()
-        .getCommonConfig()
-        .setAutoCreateSchemaEnabled(true)
-        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
-    receiverEnv
-        .getConfig()
-        .getCommonConfig()
-        .setAutoCreateSchemaEnabled(true)
-        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
-    senderEnv.initClusterEnvironment(3, 3, 180);
-    receiverEnv.initClusterEnvironment(3, 3, 180);
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
+  public void setUp() throws PipeEnvironmentException {
+    try {
+      MultiEnvFactory.createEnv(2);
+      senderEnv = MultiEnvFactory.getEnv(0);
+      receiverEnv = MultiEnvFactory.getEnv(1);
+
+      senderEnv
+          .getConfig()
+          .getCommonConfig()
+          .setAutoCreateSchemaEnabled(true)
+          
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+          
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+          .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+      receiverEnv
+          .getConfig()
+          .getCommonConfig()
+          .setAutoCreateSchemaEnabled(true)
+          
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+          
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+          .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+      senderEnv.initClusterEnvironment(3, 3, 180);
+      receiverEnv.initClusterEnvironment(3, 3, 180);
+    } catch (Exception | Error e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index 271957d61f9..161cfbfd4da 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,29 +41,7 @@ import java.util.Set;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeConnectorParallelIT {
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeConnectorParallelIT extends AbstractPipeDualIT {
   @Test
   public void testIoTConnectorParallel() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 52499cd14d4..65eb2a20f28 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -44,30 +40,7 @@ import java.util.Map;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeDataSinkIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeDataSinkIT extends AbstractPipeDualIT {
   @Test
   public void testThriftConnector() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index a8f0828bfa9..7d183548e65 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -25,16 +25,13 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -53,30 +50,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeExtractorIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeExtractorIT extends AbstractPipeDualIT {
   @Test
   public void testExtractorValidParameter() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index 2cd8acc60c2..8226d25969d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -47,30 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeLifeCycleIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeLifeCycleIT extends AbstractPipeDualIT {
   @Test
   public void testLifeCycleWithHistoryEnabled() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
similarity index 62%
copy from 
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
index 271957d61f9..42ffd2e8d74 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,48 +41,30 @@ import java.util.Set;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeConnectorParallelIT {
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeProcessorIT extends AbstractPipeDualIT {
   @Test
-  public void testIoTConnectorParallel() throws Exception {
+  public void testDownSamplingProcessor() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     String receiverIp = receiverDataNode.getIp();
     int receiverPort = receiverDataNode.getPort();
 
-    Set<String> expectedResSet = new HashSet<>();
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
       Map<String, String> connectorAttributes = new HashMap<>();
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-      connectorAttributes.put("connector.parallel.tasks", "3");
+      extractorAttributes.put("source.realtime.mode", "log");
+
+      processorAttributes.put("processor", "down-sampling-processor");
+      processorAttributes.put("processor.down-sampling.interval-seconds", 
"20");
+      processorAttributes.put("processor.down-sampling.split-file", "true");
+
+      connectorAttributes.put("sink", "iotdb-thrift-sink");
+      connectorAttributes.put("sink.batch.enable", "false");
+      connectorAttributes.put("sink.ip", receiverIp);
+      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
 
       TSStatus status =
           client.createPipe(
@@ -99,22 +77,28 @@ public class IoTDBPipeConnectorParallelIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
-              "insert into root.sg1.d1(time, s1) values (0, 1)",
-              "insert into root.sg1.d1(time, s1) values (1, 2)",
-              "insert into root.sg1.d1(time, s1) values (2, 3)",
-              "insert into root.sg1.d1(time, s1) values (3, 4)"))) {
+              "insert into root.vehicle.d0(time, s1) values (0, 1)",
+              "insert into root.vehicle.d0(time, s1) values (10000, 2)",
+              "insert into root.vehicle.d0(time, s1) values (19999, 3)",
+              "insert into root.vehicle.d0(time, s1) values (20000, 4)",
+              "insert into root.vehicle.d0(time, s1) values (20001, 5)",
+              "insert into root.vehicle.d0(time, s1) values (45000, 6)"))) {
         return;
       }
 
+      Set<String> expectedResSet = new HashSet<>();
+
       expectedResSet.add("0,1.0,");
-      expectedResSet.add("1,2.0,");
-      expectedResSet.add("2,3.0,");
-      expectedResSet.add("3,4.0,");
+      expectedResSet.add("20000,4.0,");
+      expectedResSet.add("45000,6.0,");
+
       TestUtils.assertDataOnEnv(
-          receiverEnv, "select * from root.**", "Time,root.sg1.d1.s1,", 
expectedResSet);
+          receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,", 
expectedResSet);
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index 6bcaee39b56..09df263e108 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -29,10 +29,9 @@ import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,22 +46,18 @@ import java.util.Map;
 /** Test pipe's basic functionalities under multiple cluster and consensus 
protocol settings. */
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeProtocolIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
+public class IoTDBPipeProtocolIT extends AbstractPipeDualIT {
 
+  @Override
   @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
+  public void setUp() throws PipeEnvironmentException {
+    try {
+      MultiEnvFactory.createEnv(2);
+      senderEnv = MultiEnvFactory.getEnv(0);
+      receiverEnv = MultiEnvFactory.getEnv(1);
+    } catch (Exception | Error e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
   }
 
   private void innerSetUp(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
index 4894a88a132..4a594730822 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.After;
@@ -42,14 +43,21 @@ import java.util.Map;
 public class IoTDBPipeSingleEnvDemoIT {
   @Before
   public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(1);
-
-    MultiEnvFactory.getEnv(0).initClusterEnvironment(1, 1);
+    try {
+      MultiEnvFactory.createEnv(1);
+      MultiEnvFactory.getEnv(0).initClusterEnvironment(1, 1);
+    } catch (Exception e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
   }
 
   @After
   public void tearDown() {
-    MultiEnvFactory.getEnv(0).cleanClusterEnvironment();
+    try {
+      MultiEnvFactory.getEnv(0).cleanClusterEnvironment();
+    } catch (Exception e) {
+      throw new PipeEnvironmentException(e.getMessage(), e);
+    }
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index ea46ac87028..6c5838488ca 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -25,16 +25,12 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,30 +41,7 @@ import java.util.Map;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeSwitchStatusIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeSwitchStatusIT extends AbstractPipeDualIT {
   @Test
   public void testPipeSwitchStatus() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
index d29b8d9ea50..58e248dc251 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
@@ -24,16 +24,12 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
-import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -50,30 +46,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeSyntaxIT {
-
-  private BaseEnv senderEnv;
-  private BaseEnv receiverEnv;
-
-  @Before
-  public void setUp() throws Exception {
-    MultiEnvFactory.createEnv(2);
-    senderEnv = MultiEnvFactory.getEnv(0);
-    receiverEnv = MultiEnvFactory.getEnv(1);
-
-    senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-    receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
-  }
-
-  @After
-  public void tearDown() {
-    senderEnv.cleanClusterEnvironment();
-    receiverEnv.cleanClusterEnvironment();
-  }
-
+public class IoTDBPipeSyntaxIT extends AbstractPipeDualIT {
   @Test
   public void testValidPipeName() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
index 333502da43b..e98968b0fef 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
@@ -137,7 +137,7 @@ public interface Row {
   int size();
 
   /**
-   * Returns the actual column index of the given column name.
+   * Returns the actual column index of the given column (measurement) name.
    *
    * @param columnName the column name in Path form
    * @return the actual column index of the given column name
@@ -145,6 +145,14 @@ public interface Row {
    */
   int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
 
+  /**
+   * Returns the actual column (measurement) name of the given column index.
+   *
+   * @param columnIndex the column index
+   * @return the actual column (measurement) name of the given column index
+   */
+  String getColumnName(int columnIndex);
+
   /**
    * Returns the column data types in the Row.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
index abd3e8669f9..68d32dc5529 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -35,6 +36,8 @@ public class PipeProcessorConstructor extends 
PipePluginConstructor {
   protected void initConstructors() {
     PLUGIN_CONSTRUCTORS.put(
         BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), 
DoNothingProcessor::new);
+    PLUGIN_CONSTRUCTORS.put(
+        BuiltinPipePlugin.DOWN_SAMPLING_PROCESSOR.getPipePluginName(), 
DownSamplingProcessor::new);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
index 8caa87c03ec..7593fc74f1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
@@ -19,10 +19,22 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
 public class PipeProcessorConstant {
 
   public static final String PROCESSOR_KEY = "processor";
 
+  public static final String PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY =
+      "processor.down-sampling.interval-seconds";
+  public static final long 
PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
+  public static final String PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY =
+      "processor.down-sampling.split-file";
+  public static final boolean PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE 
= false;
+  public static final String PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY 
=
+      "processor.down-sampling.memory-limit-in-bytes";
+  public static final long 
PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB;
+
   private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
index a8fe2187750..207f0e3e0ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -21,14 +21,7 @@ package org.apache.iotdb.db.pipe.config.plugin.env;
 
 public class PipeTaskConnectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
 
-  private final int regionId;
-
   public PipeTaskConnectorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
-    super(pipeName, creationTime);
-    this.regionId = regionId;
-  }
-
-  public int getRegionId() {
-    return regionId;
+    super(pipeName, creationTime, regionId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
index a726637d5b0..11ac1dfd0d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
@@ -23,21 +23,14 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 
 public class PipeTaskExtractorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
 
-  private final int regionId;
-
   private final PipeTaskMeta pipeTaskMeta;
 
   public PipeTaskExtractorRuntimeEnvironment(
       String pipeName, long creationTime, int regionId, PipeTaskMeta 
pipeTaskMeta) {
-    super(pipeName, creationTime);
-    this.regionId = regionId;
+    super(pipeName, creationTime, regionId);
     this.pipeTaskMeta = pipeTaskMeta;
   }
 
-  public int getRegionId() {
-    return regionId;
-  }
-
   public PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
similarity index 76%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
index a8fe2187750..eb6656decc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
@@ -19,16 +19,9 @@
 
 package org.apache.iotdb.db.pipe.config.plugin.env;
 
-public class PipeTaskConnectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+public class PipeTaskProcessorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
 
-  private final int regionId;
-
-  public PipeTaskConnectorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
-    super(pipeName, creationTime);
-    this.regionId = regionId;
-  }
-
-  public int getRegionId() {
-    return regionId;
+  public PipeTaskProcessorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
+    super(pipeName, creationTime, regionId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index a935dcf2dac..026d2605a95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -25,10 +25,12 @@ public class PipeTaskRuntimeEnvironment implements 
PipeRuntimeEnvironment {
 
   private final String pipeName;
   private final long creationTime;
+  private final int regionId;
 
-  public PipeTaskRuntimeEnvironment(String pipeName, long creationTime) {
+  protected PipeTaskRuntimeEnvironment(String pipeName, long creationTime, int 
regionId) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
+    this.regionId = regionId;
   }
 
   @Override
@@ -40,4 +42,8 @@ public class PipeTaskRuntimeEnvironment implements 
PipeRuntimeEnvironment {
   public long getCreationTime() {
     return creationTime;
   }
+
+  public int getRegionId() {
+    return regionId;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index a4c34ae22bc..87f36db8191 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -159,6 +159,11 @@ public class PipeRow implements Row {
         String.format("column %s not found", columnName.getFullPath()));
   }
 
+  @Override
+  public String getColumnName(int columnIndex) {
+    return columnNameStringList[columnIndex];
+  }
+
   @Override
   public List<Type> getColumnTypes() {
     return 
PipeDataTypeTransformer.transformToPipeDataTypeList(Arrays.asList(valueColumnTypes));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
new file mode 100644
index 00000000000..59dbc21df9d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
+
+public class DownSamplingProcessor implements PipeProcessor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DownSamplingProcessor.class);
+
+  private String dataBaseNameWithPathSeparator;
+  private long intervalInCurrentPrecision;
+  private boolean shouldSplitFile;
+
+  private PartialPathLastTimeCache partialPathLastTimeCache;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // No need to validate.
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    final String dataBaseName =
+        StorageEngine.getInstance()
+            .getDataRegion(
+                new DataRegionId(
+                    ((PipeTaskProcessorRuntimeEnvironment) 
configuration.getRuntimeEnvironment())
+                        .getRegionId()))
+            .getDatabaseName();
+    final long intervalSeconds =
+        parameters.getLongOrDefault(
+            PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
+            PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE);
+    final long memoryLimitInBytes =
+        parameters.getLongOrDefault(
+            PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+            PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+    shouldSplitFile =
+        parameters.getBooleanOrDefault(
+            PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+            PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE);
+    LOGGER.info(
+        "DownSamplingProcessor in {} is initialized with {}: {}s, {}: {}, {}: 
{}.",
+        dataBaseName,
+        PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
+        intervalSeconds,
+        PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+        memoryLimitInBytes,
+        PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+        shouldSplitFile);
+
+    dataBaseNameWithPathSeparator = dataBaseName + 
TsFileConstant.PATH_SEPARATOR;
+    intervalInCurrentPrecision =
+        TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds, 
TimeUnit.SECONDS);
+
+    partialPathLastTimeCache = new 
PartialPathLastTimeCache(memoryLimitInBytes);
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      eventCollector.collect(tabletInsertionEvent);
+      return;
+    }
+
+    final AtomicReference<String> deviceSuffix = new AtomicReference<>();
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    tabletInsertionEvent
+        .processRowByRow(
+            (row, rowCollector) -> {
+              // To reduce the memory usage, we use the device suffix
+              // instead of the full path as the key.
+              if (deviceSuffix.get() == null) {
+                
deviceSuffix.set(row.getDeviceId().replaceFirst(dataBaseNameWithPathSeparator, 
""));
+              }
+
+              processRow(row, rowCollector, deviceSuffix.get(), exception);
+            })
+        .forEach(
+            event -> {
+              try {
+                eventCollector.collect(event);
+              } catch (Exception e) {
+                exception.set(e);
+              }
+            });
+
+    if (exception.get() != null) {
+      throw exception.get();
+    }
+  }
+
+  private void processRow(
+      Row row,
+      RowCollector rowCollector,
+      String deviceSuffix,
+      AtomicReference<Exception> exception) {
+    boolean hasNonNullMeasurements = false;
+
+    for (int index = 0, size = row.size(); index < size; ++index) {
+      if (row.isNull(index)) {
+        continue;
+      }
+
+      final String timeSeriesSuffix =
+          deviceSuffix + TsFileConstant.PATH_SEPARATOR + 
row.getColumnName(index);
+      final Long lastSampleTime = 
partialPathLastTimeCache.getPartialPathLastTime(timeSeriesSuffix);
+
+      if (lastSampleTime != null) {
+        if (Math.abs(row.getTime() - lastSampleTime) >= 
intervalInCurrentPrecision) {
+          hasNonNullMeasurements = true;
+          partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix, 
row.getTime());
+        }
+      } else {
+        hasNonNullMeasurements = true;
+        partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix, 
row.getTime());
+      }
+    }
+
+    if (hasNonNullMeasurements) {
+      try {
+        rowCollector.collectRow(row);
+      } catch (Exception e) {
+        exception.set(e);
+      }
+    }
+  }
+
+  /**
+   * If data comes in {@link TsFileInsertionEvent}, we will not split it into 
{@link
+   * TabletInsertionEvent} by default, because the data in {@link 
TsFileInsertionEvent} is already
+   * compressed, down-sampling will not reduce the size of data but will 
increase the CPU usage.
+   */
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    if (shouldSplitFile) {
+      try {
+        for (final TabletInsertionEvent tabletInsertionEvent :
+            tsFileInsertionEvent.toTabletInsertionEvents()) {
+          process(tabletInsertionEvent, eventCollector);
+        }
+      } finally {
+        tsFileInsertionEvent.close();
+      }
+    } else {
+      eventCollector.collect(tsFileInsertionEvent);
+    }
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+    eventCollector.collect(event);
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (partialPathLastTimeCache != null) {
+      partialPathLastTimeCache.close();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
new file mode 100644
index 00000000000..226dbe361f5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling;
+
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.utils.MemUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Map-like component to look up for the last chosen time of a timeSeries. It 
has max size and
+ * timeSeries may fail to find its last time and must design the logic to 
handle this.
+ */
+public class PartialPathLastTimeCache implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialPathLastTimeCache.class);
+
+  private final PipeMemoryBlock allocatedMemoryBlock;
+  // Used to adjust the memory usage of the cache
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+
+  private final Cache<String, Long> partialPath2TimeCache;
+
+  public PartialPathLastTimeCache(long memoryLimitInBytes) {
+    allocatedMemoryBlock =
+        PipeResourceManager.memory()
+            .tryAllocate(memoryLimitInBytes)
+            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+            .setShrinkCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() * ((double) oldMemory / 
newMemory));
+                  LOGGER.info(
+                      "PartialPathLastTimeCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                })
+            .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
memoryLimitInBytes))
+            .setExpandCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() / ((double) newMemory / 
oldMemory));
+                  LOGGER.info(
+                      "PartialPathLastTimeCache.allocatedMemoryBlock has 
expanded from {} to {}.",
+                      oldMemory,
+                      newMemory);
+                });
+
+    // Currently disable the metric here because it's not a constant cache and 
the number may
+    // fluctuate. In the future all the "processorCache"s may be recorded in 
single metric entry
+    partialPath2TimeCache =
+        Caffeine.newBuilder()
+            .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+            .weigher(
+                // Here partial path is a part of full path adequate to 
inspect the last time
+                (Weigher<String, Long>)
+                    (partialPath, timeStamp) -> {
+                      final long weightInLong =
+                          (long)
+                              ((MemUtils.getStringMem(partialPath) + 
Long.BYTES)
+                                  * memoryUsageCheatFactor.get());
+                      if (weightInLong <= 0) {
+                        return Integer.MAX_VALUE;
+                      }
+                      final int weightInInt = (int) weightInLong;
+                      return weightInInt != weightInLong ? Integer.MAX_VALUE : 
weightInInt;
+                    })
+            .build();
+  }
+
+  /////////////////////////// Getter & Setter ///////////////////////////
+
+  public Long getPartialPathLastTime(String partialPath) {
+    return partialPath2TimeCache.getIfPresent(partialPath);
+  }
+
+  public void setPartialPathLastTime(String partialPath, long timeStamp) {
+    partialPath2TimeCache.put(partialPath, timeStamp);
+  }
+
+  /////////////////////////// Close ///////////////////////////
+
+  @Override
+  public void close() throws Exception {
+    partialPath2TimeCache.invalidateAll();
+    allocatedMemoryBlock.close();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 642683fb703..e0306eea7da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
 
 public class PipeMemoryBlock implements AutoCloseable {
 
@@ -41,9 +41,9 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   private final AtomicLong memoryUsageInBytes = new AtomicLong(0);
 
-  private final AtomicReference<Function<Long, Long>> shrinkMethod = new 
AtomicReference<>();
+  private final AtomicReference<LongUnaryOperator> shrinkMethod = new 
AtomicReference<>();
   private final AtomicReference<BiConsumer<Long, Long>> shrinkCallback = new 
AtomicReference<>();
-  private final AtomicReference<Function<Long, Long>> expandMethod = new 
AtomicReference<>();
+  private final AtomicReference<LongUnaryOperator> expandMethod = new 
AtomicReference<>();
   private final AtomicReference<BiConsumer<Long, Long>> expandCallback = new 
AtomicReference<>();
 
   private volatile boolean isReleased = false;
@@ -60,7 +60,7 @@ public class PipeMemoryBlock implements AutoCloseable {
     this.memoryUsageInBytes.set(memoryUsageInBytes);
   }
 
-  public PipeMemoryBlock setShrinkMethod(Function<Long, Long> shrinkMethod) {
+  public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
     this.shrinkMethod.set(shrinkMethod);
     return this;
   }
@@ -70,7 +70,7 @@ public class PipeMemoryBlock implements AutoCloseable {
     return this;
   }
 
-  public PipeMemoryBlock setExpandMethod(Function<Long, Long> extendMethod) {
+  public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
     this.expandMethod.set(extendMethod);
     return this;
   }
@@ -97,7 +97,7 @@ public class PipeMemoryBlock implements AutoCloseable {
     }
 
     final long oldMemorySizeInBytes = memoryUsageInBytes.get();
-    final long newMemorySizeInBytes = 
shrinkMethod.get().apply(memoryUsageInBytes.get());
+    final long newMemorySizeInBytes = 
shrinkMethod.get().applyAsLong(memoryUsageInBytes.get());
 
     final long memoryInBytesCanBeReleased = oldMemorySizeInBytes - 
newMemorySizeInBytes;
     if (memoryInBytesCanBeReleased <= 0
@@ -132,7 +132,7 @@ public class PipeMemoryBlock implements AutoCloseable {
     }
 
     final long oldMemorySizeInBytes = memoryUsageInBytes.get();
-    final long newMemorySizeInBytes = 
expandMethod.get().apply(memoryUsageInBytes.get());
+    final long newMemorySizeInBytes = 
expandMethod.get().applyAsLong(memoryUsageInBytes.get());
 
     final long memoryInBytesNeededToBeAllocated = newMemorySizeInBytes - 
oldMemorySizeInBytes;
     if (memoryInBytesNeededToBeAllocated <= 0
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 4bcf0e44c76..99de8bc7b00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
@@ -70,7 +70,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
       // 2. customize processor
       final PipeProcessorRuntimeConfiguration runtimeConfiguration =
-          new PipeTaskRuntimeConfiguration(new 
PipeTaskRuntimeEnvironment(pipeName, creationTime));
+          new PipeTaskRuntimeConfiguration(
+              new PipeTaskProcessorRuntimeEnvironment(
+                  pipeName, creationTime, dataRegionId.getId()));
       pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 7cc10aadf25..a7cd9b8ac0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -59,11 +59,11 @@ public class WALInsertNodeCache {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WALInsertNodeCache.class);
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final PipeMemoryBlock allocatedMemoryBlock;
   // Used to adjust the memory usage of the cache
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
   private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
+  // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> 
lruCache;
 
   // ids of all pinned memTables
@@ -80,7 +80,7 @@ public class WALInsertNodeCache {
     allocatedMemoryBlock =
         PipeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)
-            .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 1))
+            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
             .setShrinkCallback(
                 (oldMemory, newMemory) -> {
                   memoryUsageCheatFactor.set(
@@ -93,14 +93,14 @@ public class WALInsertNodeCache {
                       newMemory);
                 })
             .setExpandMethod(
-                (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2, 
requestedAllocateSize))
+                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestedAllocateSize))
             .setExpandCallback(
                 (oldMemory, newMemory) -> {
                   memoryUsageCheatFactor.set(
                       memoryUsageCheatFactor.get() / ((double) newMemory / 
oldMemory));
                   isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
                   LOGGER.info(
-                      "WALInsertNodeCache allocatedMemoryBlock of dataRegion 
{} has expanded from {} to {}.",
+                      "WALInsertNodeCache.allocatedMemoryBlock of dataRegion 
{} has expanded from {} to {}.",
                       dataRegionId,
                       oldMemory,
                       newMemory);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 96c6bbae38f..f2063aa80e2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -30,7 +30,10 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WriteBackConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DownSamplingProcessor;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -43,6 +46,7 @@ public enum BuiltinPipePlugin {
 
   // processors
   DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
+  DOWN_SAMPLING_PROCESSOR("down-sampling-processor", 
DownSamplingProcessor.class),
 
   // connectors
   DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -88,24 +92,27 @@ public enum BuiltinPipePlugin {
     return className;
   }
 
-  public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST = new 
HashSet<>();
-
-  static {
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase());
-
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase());
-
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase());
-  }
+  public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST =
+      Collections.unmodifiableSet(
+          new HashSet<>(
+              Arrays.asList(
+                  // Extractors
+                  IOTDB_EXTRACTOR.getPipePluginName().toUpperCase(),
+                  // Processors
+                  DOWN_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+                  // Connectors
+                  DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
+                  IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
+                  
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+                  
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+                  
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
+                  IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
+                  WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
+                  OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
+                  WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
+                  // Sinks
+                  IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
+                  IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
+                  IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
+                  WEBSOCKET_SINK.getPipePluginName().toUpperCase())));
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
similarity index 63%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
index 8caa87c03ec..1658876fa10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
@@ -17,13 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config.constant;
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
 
-public class PipeProcessorConstant {
-
-  public static final String PROCESSOR_KEY = "processor";
-
-  private PipeProcessorConstant() {
-    throw new IllegalStateException("Utility class");
-  }
-}
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the Down Sampling
+ * processor. There is a real implementation in the server module but cannot 
be imported here. The
+ * pipe agent in the server module will replace this class with the real 
implementation when
+ * initializing the Down Sampling processor.
+ */
+public class DownSamplingProcessor extends PlaceHolderProcessor {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
new file mode 100644
index 00000000000..431c6ece304
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the all the IoTDB pipe
+ * processors that can not be implemented in the node-commons module. Each 
IoTDB pipe processor has
+ * a real implementation in the server module but cannot be imported here. The 
pipe agent in the
+ * server module will replace this class with the real implementation when 
initializing the IoTDB
+ * pipe processor.
+ */
+public class PlaceHolderProcessor implements PipeProcessor {
+
+  private static final String PLACEHOLDER_ERROR_MSG =
+      "This class is a placeholder and should not be used.";
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+  }
+
+  @Override
+  public void close() throws Exception {
+    throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+  }
+}
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
index ac1f6190c2d..c4383231f90 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
@@ -678,6 +678,16 @@ public class BytesUtils {
     return result;
   }
 
+  /**
+   * Return the deepCopy of the given byte array.
+   *
+   * @param src input byte array
+   * @return byte array
+   */
+  public static byte[] deepCopy(byte[] src) {
+    return subBytes(src, 0, src.length);
+  }
+
   /**
    * cut out specified length byte array from parameter start from input byte 
array src and return.
    *

Reply via email to