This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aa547fb3889 Pipe: implemented sdt-sampling-processor & refactored 
down-sampling-processor (#12114)
aa547fb3889 is described below

commit aa547fb38895d5cc0d95212d5adef76bcd021909
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Thu Mar 14 17:12:22 2024 +0800

    Pipe: implemented sdt-sampling-processor & refactored 
down-sampling-processor (#12114)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java |  30 ++-
 .../pipe/it/autocreate/IoTDBPipeProcessorIT.java   |   6 +-
 .../PipeDataRegionProcessorConstructor.java        |   9 +-
 .../pipe/event/common/row/PipeRemarkableRow.java   |  55 ++++++
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |  18 +-
 .../downsampling/DownSamplingProcessor.java        | 122 +++++--------
 ...eCache.java => PartialPathLastObjectCache.java} |  38 ++--
 .../sdt/SwingingDoorTrendingFilter.java            | 145 +++++++++++++++
 .../sdt/SwingingDoorTrendingSamplingProcessor.java | 201 +++++++++++++++++++++
 .../tumbling/TumblingTimeSamplingProcessor.java    | 135 ++++++++++++++
 .../config/constant/PipeProcessorConstant.java     |  17 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  10 +-
 ... => SwingingDoorTrendingSamplingProcessor.java} |  10 +-
 ...sor.java => TumblingTimeSamplingProcessor.java} |  10 +-
 14 files changed, 667 insertions(+), 139 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index cad1e28ef64..80c87688bc1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -141,7 +141,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     // alter pipe (replace)
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
-      statement.execute("alter pipe a2b replace processor 
('processor'='down-sampling-processor')");
+      statement.execute(
+          "alter pipe a2b replace processor 
('processor'='tumbling-time-sampling-processor')");
     } catch (SQLException e) {
       fail(e.getMessage());
     }
@@ -155,7 +156,10 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
       // check configurations
       Assert.assertTrue(
-          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
       Assert.assertTrue(
           showPipeResult
@@ -187,7 +191,10 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // check configurations
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
       Assert.assertTrue(
-          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
       Assert.assertTrue(
           showPipeResult
               .get(0)
@@ -218,7 +225,10 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // check configurations
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
       Assert.assertFalse(
-          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
       Assert.assertTrue(
           showPipeResult
               .get(0)
@@ -249,7 +259,10 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // check configurations
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
       Assert.assertFalse(
-          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
       Assert.assertTrue(
           showPipeResult
               .get(0)
@@ -299,7 +312,7 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     // create pipe
     String sql =
         String.format(
-            "create pipe a2b with processor 
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1', 
'down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
+            "create pipe a2b with processor 
('processor'='tumbling-time-sampling-processor', 
'processor.tumbling-time.interval-seconds'='1', 
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
             receiverDataNode.getIpAndPortString());
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
@@ -325,10 +338,11 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "select * from root.**", "Time,root.db.d1.at1,", 
expectedResSet);
 
-    // alter pipe (modify 'down-sampling.interval-seconds')
+    // alter pipe (modify 'processor.tumbling-time.interval-seconds')
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
-      statement.execute("alter pipe a2b modify processor 
('down-sampling.interval-seconds'='2')");
+      statement.execute(
+          "alter pipe a2b modify processor 
('processor.tumbling-time.interval-seconds'='2')");
     } catch (SQLException e) {
       fail(e.getMessage());
     }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index 62d30818657..fe12fab2bf1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -62,7 +62,7 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualAutoIT {
   }
 
   @Test
-  public void testDownSamplingProcessor() throws Exception {
+  public void testTumblingTimeSamplingProcessor() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     String receiverIp = receiverDataNode.getIp();
@@ -87,8 +87,8 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualAutoIT {
 
       extractorAttributes.put("source.realtime.mode", "log");
 
-      processorAttributes.put("processor", "down-sampling-processor");
-      processorAttributes.put("processor.down-sampling.interval-seconds", 
"20");
+      processorAttributes.put("processor", "tumbling-time-sampling-processor");
+      processorAttributes.put("processor.tumbling-time.interval-seconds", 
"20");
       processorAttributes.put("processor.down-sampling.split-file", "true");
 
       connectorAttributes.put("sink", "iotdb-thrift-sink");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 1cdbbb34221..ba49f55ff32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -23,7 +23,8 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
 
 class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
 
@@ -36,6 +37,10 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), 
DoNothingProcessor::new);
     pluginConstructors.put(
-        BuiltinPipePlugin.DOWN_SAMPLING_PROCESSOR.getPipePluginName(), 
DownSamplingProcessor::new);
+        BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
+        TumblingTimeSamplingProcessor::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
+        SwingingDoorTrendingSamplingProcessor::new);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
new file mode 100644
index 00000000000..3dcbb43feee
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.row;
+
+public class PipeRemarkableRow extends PipeRow {
+
+  private final boolean[] isNullMarked;
+
+  public PipeRemarkableRow(PipeRow pipeRow) {
+    super(
+        pipeRow.rowIndex,
+        pipeRow.deviceId,
+        pipeRow.isAligned,
+        pipeRow.measurementSchemaList,
+        pipeRow.timestampColumn,
+        pipeRow.valueColumnTypes,
+        pipeRow.valueColumns,
+        pipeRow.bitMaps,
+        pipeRow.columnNameStringList);
+
+    // Copy the isNullMarked array from the original row
+    // to avoid modifying the original row.
+    final int columnCount = pipeRow.measurementSchemaList.length;
+    isNullMarked = new boolean[columnCount];
+    for (int i = 0; i < columnCount; i++) {
+      isNullMarked[i] = super.isNull(i);
+    }
+  }
+
+  @Override
+  public boolean isNull(int index) {
+    return isNullMarked[index];
+  }
+
+  public void markNull(int index) {
+    isNullMarked[index] = true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 87f36db8191..eb872283862 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -33,18 +33,18 @@ import java.util.List;
 
 public class PipeRow implements Row {
 
-  private final int rowIndex;
+  protected final int rowIndex;
 
-  private final String deviceId;
-  private final boolean isAligned;
-  private final MeasurementSchema[] measurementSchemaList;
+  protected final String deviceId;
+  protected final boolean isAligned;
+  protected final MeasurementSchema[] measurementSchemaList;
 
-  private final long[] timestampColumn;
-  private final TSDataType[] valueColumnTypes;
-  private final Object[] valueColumns;
-  private final BitMap[] bitMaps;
+  protected final long[] timestampColumn;
+  protected final TSDataType[] valueColumnTypes;
+  protected final Object[] valueColumns;
+  protected final BitMap[] bitMaps;
 
-  private final String[] columnNameStringList;
+  protected final String[] columnNameStringList;
 
   public PipeRow(
       int rowIndex,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index 8b7278d0e0a..225ce14efe4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeE
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -37,73 +36,63 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
 
-public class DownSamplingProcessor implements PipeProcessor {
+public abstract class DownSamplingProcessor implements PipeProcessor {
+
+  protected long memoryLimitInBytes;
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DownSamplingProcessor.class);
+  protected boolean shouldSplitFile;
 
-  private String dataBaseNameWithPathSeparator;
-  private long intervalInCurrentPrecision;
-  private boolean shouldSplitFile;
+  protected String dataBaseNameWithPathSeparator;
 
-  private PartialPathLastTimeCache partialPathLastTimeCache;
+  protected PartialPathLastObjectCache<?> pathLastObjectCache;
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    // No need to validate.
+    memoryLimitInBytes =
+        validator
+            .getParameters()
+            .getLongOrDefault(
+                PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+                PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+
+    validator.validate(
+        memoryLimitInBytes -> (Long) memoryLimitInBytes > 0,
+        String.format(
+            "%s must be > 0, but got %s",
+            PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, 
memoryLimitInBytes),
+        memoryLimitInBytes);
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
-      throws Exception {
-    final String dataBaseName =
-        StorageEngine.getInstance()
-            .getDataRegion(
-                new DataRegionId(
-                    ((PipeTaskProcessorRuntimeEnvironment) 
configuration.getRuntimeEnvironment())
-                        .getRegionId()))
-            .getDatabaseName();
-    final long intervalSeconds =
-        parameters.getLongOrDefault(
-            PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
-            PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE);
-    final long memoryLimitInBytes =
-        parameters.getLongOrDefault(
-            PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
-            PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration 
configuration) {
     shouldSplitFile =
         parameters.getBooleanOrDefault(
             PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
             PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE);
-    LOGGER.info(
-        "DownSamplingProcessor in {} is initialized with {}: {}s, {}: {}, {}: 
{}.",
-        dataBaseName,
-        PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
-        intervalSeconds,
-        PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
-        memoryLimitInBytes,
-        PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
-        shouldSplitFile);
-
-    dataBaseNameWithPathSeparator = dataBaseName + 
TsFileConstant.PATH_SEPARATOR;
-    intervalInCurrentPrecision =
-        TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds, 
TimeUnit.SECONDS);
-
-    partialPathLastTimeCache = new 
PartialPathLastTimeCache(memoryLimitInBytes);
+
+    dataBaseNameWithPathSeparator =
+        StorageEngine.getInstance()
+                .getDataRegion(
+                    new DataRegionId(
+                        ((PipeTaskProcessorRuntimeEnvironment)
+                                configuration.getRuntimeEnvironment())
+                            .getRegionId()))
+                .getDatabaseName()
+            + TsFileConstant.PATH_SEPARATOR;
+
+    pathLastObjectCache = initPathLastObjectCache(memoryLimitInBytes);
   }
 
+  protected abstract PartialPathLastObjectCache<?> 
initPathLastObjectCache(long memoryLimitInBytes);
+
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
@@ -122,7 +111,8 @@ public class DownSamplingProcessor implements PipeProcessor 
{
               // To reduce the memory usage, we use the device suffix
               // instead of the full path as the key.
               if (deviceSuffix.get() == null) {
-                
deviceSuffix.set(row.getDeviceId().replaceFirst(dataBaseNameWithPathSeparator, 
""));
+                deviceSuffix.set(
+                    
row.getDeviceId().replaceFirst(this.dataBaseNameWithPathSeparator, ""));
               }
 
               processRow(row, rowCollector, deviceSuffix.get(), exception);
@@ -141,41 +131,11 @@ public class DownSamplingProcessor implements 
PipeProcessor {
     }
   }
 
-  private void processRow(
+  protected abstract void processRow(
       Row row,
       RowCollector rowCollector,
       String deviceSuffix,
-      AtomicReference<Exception> exception) {
-    boolean hasNonNullMeasurements = false;
-
-    for (int index = 0, size = row.size(); index < size; ++index) {
-      if (row.isNull(index)) {
-        continue;
-      }
-
-      final String timeSeriesSuffix =
-          deviceSuffix + TsFileConstant.PATH_SEPARATOR + 
row.getColumnName(index);
-      final Long lastSampleTime = 
partialPathLastTimeCache.getPartialPathLastTime(timeSeriesSuffix);
-
-      if (lastSampleTime != null) {
-        if (Math.abs(row.getTime() - lastSampleTime) >= 
intervalInCurrentPrecision) {
-          hasNonNullMeasurements = true;
-          partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix, 
row.getTime());
-        }
-      } else {
-        hasNonNullMeasurements = true;
-        partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix, 
row.getTime());
-      }
-    }
-
-    if (hasNonNullMeasurements) {
-      try {
-        rowCollector.collectRow(row);
-      } catch (Exception e) {
-        exception.set(e);
-      }
-    }
-  }
+      AtomicReference<Exception> exception);
 
   /**
    * If data comes in {@link TsFileInsertionEvent}, we will not split it into 
{@link
@@ -207,8 +167,8 @@ public class DownSamplingProcessor implements PipeProcessor 
{
 
   @Override
   public void close() throws Exception {
-    if (partialPathLastTimeCache != null) {
-      partialPathLastTimeCache.close();
+    if (pathLastObjectCache != null) {
+      pathLastObjectCache.close();
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
similarity index 77%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index 226dbe361f5..bbee9babed9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -30,21 +30,17 @@ import com.google.common.util.concurrent.AtomicDouble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Map-like component to look up for the last chosen time of a timeSeries. It 
has max size and
- * timeSeries may fail to find its last time and must design the logic to 
handle this.
- */
-public class PartialPathLastTimeCache implements AutoCloseable {
+public abstract class PartialPathLastObjectCache<T> implements AutoCloseable {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialPathLastTimeCache.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialPathLastObjectCache.class);
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   // Used to adjust the memory usage of the cache
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
-  private final Cache<String, Long> partialPath2TimeCache;
+  private final Cache<String, T> partialPath2ObjectCache;
 
-  public PartialPathLastTimeCache(long memoryLimitInBytes) {
+  public PartialPathLastObjectCache(long memoryLimitInBytes) {
     allocatedMemoryBlock =
         PipeResourceManager.memory()
             .tryAllocate(memoryLimitInBytes)
@@ -54,7 +50,7 @@ public class PartialPathLastTimeCache implements 
AutoCloseable {
                   memoryUsageCheatFactor.set(
                       memoryUsageCheatFactor.get() * ((double) oldMemory / 
newMemory));
                   LOGGER.info(
-                      "PartialPathLastTimeCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
+                      "PartialPathLastObjectCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
                       oldMemory,
                       newMemory);
                 })
@@ -64,23 +60,23 @@ public class PartialPathLastTimeCache implements 
AutoCloseable {
                   memoryUsageCheatFactor.set(
                       memoryUsageCheatFactor.get() / ((double) newMemory / 
oldMemory));
                   LOGGER.info(
-                      "PartialPathLastTimeCache.allocatedMemoryBlock has 
expanded from {} to {}.",
+                      "PartialPathLastObjectCache.allocatedMemoryBlock has 
expanded from {} to {}.",
                       oldMemory,
                       newMemory);
                 });
 
     // Currently disable the metric here because it's not a constant cache and 
the number may
     // fluctuate. In the future all the "processorCache"s may be recorded in 
single metric entry
-    partialPath2TimeCache =
+    partialPath2ObjectCache =
         Caffeine.newBuilder()
             .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
             .weigher(
-                // Here partial path is a part of full path adequate to 
inspect the last time
-                (Weigher<String, Long>)
-                    (partialPath, timeStamp) -> {
+                // Here partial path is a part of full path adequate to 
inspect the last object
+                (Weigher<String, T>)
+                    (partialPath, object) -> {
                       final long weightInLong =
                           (long)
-                              ((MemUtils.getStringMem(partialPath) + 
Long.BYTES)
+                              ((MemUtils.getStringMem(partialPath) + 
calculateMemoryUsage(object))
                                   * memoryUsageCheatFactor.get());
                       if (weightInLong <= 0) {
                         return Integer.MAX_VALUE;
@@ -91,21 +87,23 @@ public class PartialPathLastTimeCache implements 
AutoCloseable {
             .build();
   }
 
+  protected abstract long calculateMemoryUsage(T object);
+
   /////////////////////////// Getter & Setter ///////////////////////////
 
-  public Long getPartialPathLastTime(String partialPath) {
-    return partialPath2TimeCache.getIfPresent(partialPath);
+  public T getPartialPathLastObject(String partialPath) {
+    return partialPath2ObjectCache.getIfPresent(partialPath);
   }
 
-  public void setPartialPathLastTime(String partialPath, long timeStamp) {
-    partialPath2TimeCache.put(partialPath, timeStamp);
+  public void setPartialPathLastObject(String partialPath, T object) {
+    partialPath2ObjectCache.put(partialPath, object);
   }
 
   /////////////////////////// Close ///////////////////////////
 
   @Override
   public void close() throws Exception {
-    partialPath2TimeCache.invalidateAll();
+    partialPath2ObjectCache.invalidateAll();
     allocatedMemoryBlock.close();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
new file mode 100644
index 00000000000..f80b189427a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import java.util.Objects;
+
+public class SwingingDoorTrendingFilter<T> {
+
+  private final SwingingDoorTrendingSamplingProcessor processor;
+
+  /**
+   * the maximum curUpperSlope between the lastStoredPoint to the current 
point upperDoor can only
+   * open up
+   */
+  private double upperDoor;
+  /**
+   * the minimum curLowerSlope between the lastStoredPoint to the current 
point lowerDoor can only
+   * open downward
+   */
+  private double lowerDoor;
+
+  /**
+   * the last read time and value if upperDoor >= lowerDoor meaning out of 
compressionDeviation
+   * range, will store lastReadTimestamp and lastReadValue
+   */
+  private long lastReadTimestamp;
+
+  private T lastReadValue;
+
+  /**
+   * the last stored time and value we compare current point against 
lastReadTimestamp and
+   * lastReadValue
+   */
+  private long lastStoredTimestamp;
+
+  private T lastStoredValue;
+
+  public SwingingDoorTrendingFilter(
+      SwingingDoorTrendingSamplingProcessor processor, long firstTimestamp, T 
firstValue) {
+    this.processor = processor;
+    init(firstTimestamp, firstValue);
+  }
+
+  private void init(long firstTimestamp, T firstValue) {
+    upperDoor = Double.MIN_VALUE;
+    lowerDoor = Double.MAX_VALUE;
+
+    lastReadTimestamp = firstTimestamp;
+    lastReadValue = firstValue;
+
+    lastStoredTimestamp = firstTimestamp;
+    lastStoredValue = firstValue;
+  }
+
+  public boolean filter(long timestamp, T value) {
+    try {
+      return tryFilter(timestamp, value);
+    } catch (Exception e) {
+      init(timestamp, value);
+      return true;
+    }
+  }
+
+  private boolean tryFilter(long timestamp, T value) {
+    final long timeDiff = timestamp - lastStoredTimestamp;
+    final long absTimeDiff = Math.abs(timeDiff);
+
+    if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
+      return false;
+    }
+
+    if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
+      reset(timestamp, value);
+      return true;
+    }
+
+    // For boolean and string type, we only compare the value
+    if (value instanceof Boolean || value instanceof String) {
+      if (Objects.equals(lastStoredValue, value)) {
+        return false;
+      }
+
+      reset(timestamp, value);
+      return true;
+    }
+
+    // For other numerical types, we compare the value and the time difference
+    final double doubleValue = Double.parseDouble(value.toString());
+    final double lastStoredDoubleValue = 
Double.parseDouble(lastStoredValue.toString());
+    final double valueDiff = doubleValue - lastStoredDoubleValue;
+
+    final double currentUpperSlope = (valueDiff - 
processor.getCompressionDeviation()) / timeDiff;
+    if (currentUpperSlope > upperDoor) {
+      upperDoor = currentUpperSlope;
+    }
+
+    final double currentLowerSlope = (valueDiff + 
processor.getCompressionDeviation()) / timeDiff;
+    if (currentLowerSlope < lowerDoor) {
+      lowerDoor = currentLowerSlope;
+    }
+
+    if (upperDoor > lowerDoor) {
+      lastStoredTimestamp = lastReadTimestamp;
+      lastStoredValue = lastReadValue;
+
+      upperDoor = currentUpperSlope;
+      lowerDoor = currentLowerSlope;
+
+      lastReadValue = value;
+      lastReadTimestamp = timestamp;
+
+      return true;
+    }
+
+    lastReadValue = value;
+    lastReadTimestamp = timestamp;
+
+    return false;
+  }
+
+  private void reset(long timestamp, T value) {
+    upperDoor = Double.MIN_VALUE;
+    lowerDoor = Double.MAX_VALUE;
+
+    lastStoredTimestamp = timestamp;
+    lastStoredValue = value;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
new file mode 100644
index 00000000000..ebfde47671e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SwingingDoorTrendingSamplingProcessor extends 
DownSamplingProcessor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SwingingDoorTrendingSamplingProcessor.class);
+
+  /**
+   * the maximum absolute difference the user set if the data's value is within
+   * compressionDeviation, it will be compressed and discarded after 
compression, it will only store
+   * out of range (time, data) to form the trend
+   */
+  private double compressionDeviation;
+
+  /**
+   * the minimum time distance between two stored data points if current point 
time to the last
+   * stored point time distance <= compressionMinTimeInterval, current point 
will NOT be stored
+   * regardless of compression deviation
+   */
+  private long compressionMinTimeInterval;
+
+  /**
+   * the maximum time distance between two stored data points if current point 
time to the last
+   * stored point time distance >= compressionMaxTimeInterval, current point 
will be stored
+   * regardless of compression deviation
+   */
+  private long compressionMaxTimeInterval;
+
+  private PartialPathLastObjectCache<SwingingDoorTrendingFilter<?>> 
pathLastObjectCache;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    super.validate(validator);
+
+    final PipeParameters parameters = validator.getParameters();
+    compressionDeviation =
+        parameters.getDoubleOrDefault(
+            PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+            
PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE);
+    compressionMinTimeInterval =
+        parameters.getLongOrDefault(
+            PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+            
PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE);
+    compressionMaxTimeInterval =
+        parameters.getLongOrDefault(
+            PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+            
PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE);
+
+    validator
+        .validate(
+            compressionDeviation -> (Double) compressionDeviation >= 0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+                compressionDeviation),
+            compressionDeviation)
+        .validate(
+            compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 
0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+                compressionMinTimeInterval),
+            compressionMinTimeInterval)
+        .validate(
+            compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 
0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+                compressionMaxTimeInterval),
+            compressionMaxTimeInterval)
+        .validate(
+            minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1],
+            String.format(
+                "%s must be <= %s, but got %s and %s",
+                PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+                PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+                compressionMinTimeInterval,
+                compressionMaxTimeInterval),
+            compressionMinTimeInterval,
+            compressionMaxTimeInterval);
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration 
configuration) {
+    super.customize(parameters, configuration);
+
+    LOGGER.info(
+        "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}: 
{}, {}: {}, {}: {}.",
+        dataBaseNameWithPathSeparator,
+        PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+        compressionDeviation,
+        PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+        compressionMinTimeInterval,
+        PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+        compressionMaxTimeInterval);
+  }
+
+  @Override
+  protected PartialPathLastObjectCache<?> initPathLastObjectCache(long 
memoryLimitInBytes) {
+    pathLastObjectCache =
+        new 
PartialPathLastObjectCache<SwingingDoorTrendingFilter<?>>(memoryLimitInBytes) {
+          @Override
+          protected long calculateMemoryUsage(SwingingDoorTrendingFilter<?> 
object) {
+            return 64; // Long.BYTES * 8
+          }
+        };
+    return pathLastObjectCache;
+  }
+
+  @Override
+  protected void processRow(
+      Row row,
+      RowCollector rowCollector,
+      String deviceSuffix,
+      AtomicReference<Exception> exception) {
+    final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) 
row);
+
+    boolean hasNonNullMeasurements = false;
+    for (int i = 0, size = row.size(); i < size; i++) {
+      if (row.isNull(i)) {
+        continue;
+      }
+
+      final String timeSeriesSuffix =
+          deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i);
+      final SwingingDoorTrendingFilter filter =
+          pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+      if (filter != null) {
+        if (filter.filter(row.getTime(), row.getObject(i))) {
+          hasNonNullMeasurements = true;
+        } else {
+          remarkableRow.markNull(i);
+        }
+      } else {
+        hasNonNullMeasurements = true;
+        pathLastObjectCache.setPartialPathLastObject(
+            timeSeriesSuffix,
+            new SwingingDoorTrendingFilter<>(this, row.getTime(), 
row.getObject(i)));
+      }
+    }
+
+    if (hasNonNullMeasurements) {
+      try {
+        rowCollector.collectRow(remarkableRow);
+      } catch (IOException e) {
+        exception.set(e);
+      }
+    }
+  }
+
+  double getCompressionDeviation() {
+    return compressionDeviation;
+  }
+
+  long getCompressionMinTimeInterval() {
+    return compressionMinTimeInterval;
+  }
+
+  long getCompressionMaxTimeInterval() {
+    return compressionMaxTimeInterval;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
new file mode 100644
index 00000000000..3ce009d21d9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.tumbling;
+
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY;
+
+public class TumblingTimeSamplingProcessor extends DownSamplingProcessor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TumblingTimeSamplingProcessor.class);
+
+  private long intervalInCurrentPrecision;
+
+  private PartialPathLastObjectCache<Long> pathLastObjectCache;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    super.validate(validator);
+
+    final long intervalSeconds =
+        validator
+            .getParameters()
+            .getLongOrDefault(
+                PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY,
+                PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE);
+    validator.validate(
+        seconds -> (Long) seconds > 0,
+        String.format(
+            "The value of %s must be greater than 0, but got %d.",
+            PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY, intervalSeconds),
+        intervalSeconds);
+    intervalInCurrentPrecision =
+        TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds, 
TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration 
configuration) {
+    super.customize(parameters, configuration);
+
+    LOGGER.info(
+        "TumblingTimeSamplingProcessor in {} is initialized with {}: {}s, {}: 
{}, {}: {}.",
+        dataBaseNameWithPathSeparator,
+        PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY,
+        intervalInCurrentPrecision,
+        PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+        memoryLimitInBytes,
+        PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+        shouldSplitFile);
+  }
+
+  @Override
+  protected PartialPathLastObjectCache<?> initPathLastObjectCache(long 
memoryLimitInBytes) {
+    pathLastObjectCache =
+        new PartialPathLastObjectCache<Long>(memoryLimitInBytes) {
+          @Override
+          protected long calculateMemoryUsage(Long object) {
+            return Long.BYTES;
+          }
+        };
+    return pathLastObjectCache;
+  }
+
+  @Override
+  protected void processRow(
+      Row row,
+      RowCollector rowCollector,
+      String deviceSuffix,
+      AtomicReference<Exception> exception) {
+    for (int index = 0, size = row.size(); index < size; ++index) {
+      if (row.isNull(index)) {
+        continue;
+      }
+
+      final String timeSeriesSuffix =
+          deviceSuffix + TsFileConstant.PATH_SEPARATOR + 
row.getColumnName(index);
+      final long currentRowTime = row.getTime();
+      final Long lastSampleTime = 
pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+      if (lastSampleTime == null
+          || Math.abs(currentRowTime - lastSampleTime) >= 
intervalInCurrentPrecision) {
+        try {
+          rowCollector.collectRow(row);
+
+          pathLastObjectCache.setPartialPathLastObject(timeSeriesSuffix, 
currentRowTime);
+          for (int j = index + 1; j < size; ++j) {
+            if (!row.isNull(j)) {
+              pathLastObjectCache.setPartialPathLastObject(
+                  deviceSuffix + TsFileConstant.PATH_SEPARATOR + 
row.getColumnName(j),
+                  currentRowTime);
+            }
+          }
+          return;
+        } catch (Exception e) {
+          exception.set(e);
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index d5e58abea12..4811f342b0e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -25,9 +25,6 @@ public class PipeProcessorConstant {
 
   public static final String PROCESSOR_KEY = "processor";
 
-  public static final String PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY =
-      "processor.down-sampling.interval-seconds";
-  public static final long 
PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
   public static final String PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY =
       "processor.down-sampling.split-file";
   public static final boolean PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE 
= false;
@@ -35,6 +32,20 @@ public class PipeProcessorConstant {
       "processor.down-sampling.memory-limit-in-bytes";
   public static final long 
PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB;
 
+  public static final String PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY =
+      "processor.tumbling-time.interval-seconds";
+  public static final long 
PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
+
+  public static final String PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY =
+      "processor.sdt.compression-deviation";
+  public static final double PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE 
= 0;
+  public static final String PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY =
+      "processor.sdt.min-time-interval";
+  public static final long PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0;
+  public static final String PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY =
+      "processor.sdt.max-time-interval";
+  public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = 
Long.MAX_VALUE;
+
   private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index cb55d5c5bc6..c1ae3a75f2b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -32,7 +32,8 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.writeback.WriteBac
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.donothing.DoNothingExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
-import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.DownSamplingProcessor;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,7 +51,9 @@ public enum BuiltinPipePlugin {
 
   // processors
   DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
-  DOWN_SAMPLING_PROCESSOR("down-sampling-processor", 
DownSamplingProcessor.class),
+  TUMBLING_TIME_SAMPLING_PROCESSOR(
+      "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
+  SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", 
SwingingDoorTrendingSamplingProcessor.class),
 
   // connectors
   DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -108,7 +111,8 @@ public enum BuiltinPipePlugin {
                   // Sources
                   DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(),
                   // Processors
-                  DOWN_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+                  
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+                  SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
                   // Connectors
                   DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
similarity index 75%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
index 9742fadf38e..dd4df672105 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
@@ -22,9 +22,9 @@ package 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the Down Sampling
- * processor. There is a real implementation in the server module but cannot 
be imported here. The
- * pipe agent in the server module will replace this class with the real 
implementation when
- * initializing the Down Sampling processor.
+ * This class is a placeholder and should not be initialized. It represents the
+ * sdt-sampling-processor. There is a real implementation in the server module 
but cannot be
+ * imported here. The pipe agent in the server module will replace this class 
with the real
+ * implementation when initializing the sdt-sampling-processor.
  */
-public class DownSamplingProcessor extends PlaceHolderProcessor {}
+public class SwingingDoorTrendingSamplingProcessor extends 
PlaceHolderProcessor {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
similarity index 74%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
index 9742fadf38e..e57df60d7b1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
@@ -22,9 +22,9 @@ package 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the Down Sampling
- * processor. There is a real implementation in the server module but cannot 
be imported here. The
- * pipe agent in the server module will replace this class with the real 
implementation when
- * initializing the Down Sampling processor.
+ * This class is a placeholder and should not be initialized. It represents the
+ * tumbling-time-sampling-processor. There is a real implementation in the 
server module but cannot
+ * be imported here. The pipe agent in the server module will replace this 
class with the real
+ * implementation when initializing the tumbling-time-sampling-processor.
  */
-public class DownSamplingProcessor extends PlaceHolderProcessor {}
+public class TumblingTimeSamplingProcessor extends PlaceHolderProcessor {}

Reply via email to