This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aa547fb3889 Pipe: implemented sdt-sampling-processor & refactored
down-sampling-processor (#12114)
aa547fb3889 is described below
commit aa547fb38895d5cc0d95212d5adef76bcd021909
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Thu Mar 14 17:12:22 2024 +0800
Pipe: implemented sdt-sampling-processor & refactored
down-sampling-processor (#12114)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 30 ++-
.../pipe/it/autocreate/IoTDBPipeProcessorIT.java | 6 +-
.../PipeDataRegionProcessorConstructor.java | 9 +-
.../pipe/event/common/row/PipeRemarkableRow.java | 55 ++++++
.../iotdb/db/pipe/event/common/row/PipeRow.java | 18 +-
.../downsampling/DownSamplingProcessor.java | 122 +++++--------
...eCache.java => PartialPathLastObjectCache.java} | 38 ++--
.../sdt/SwingingDoorTrendingFilter.java | 145 +++++++++++++++
.../sdt/SwingingDoorTrendingSamplingProcessor.java | 201 +++++++++++++++++++++
.../tumbling/TumblingTimeSamplingProcessor.java | 135 ++++++++++++++
.../config/constant/PipeProcessorConstant.java | 17 +-
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 10 +-
... => SwingingDoorTrendingSamplingProcessor.java} | 10 +-
...sor.java => TumblingTimeSamplingProcessor.java} | 10 +-
14 files changed, 667 insertions(+), 139 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index cad1e28ef64..80c87688bc1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -141,7 +141,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// alter pipe (replace)
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
- statement.execute("alter pipe a2b replace processor
('processor'='down-sampling-processor')");
+ statement.execute(
+ "alter pipe a2b replace processor
('processor'='tumbling-time-sampling-processor')");
} catch (SQLException e) {
fail(e.getMessage());
}
@@ -155,7 +156,10 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// check configurations
Assert.assertTrue(
-
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
Assert.assertTrue(
showPipeResult
@@ -187,7 +191,10 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// check configurations
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
Assert.assertTrue(
-
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
Assert.assertTrue(
showPipeResult
.get(0)
@@ -218,7 +225,10 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// check configurations
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
Assert.assertFalse(
-
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
Assert.assertTrue(
showPipeResult
.get(0)
@@ -249,7 +259,10 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// check configurations
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
Assert.assertFalse(
-
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
Assert.assertTrue(
showPipeResult
.get(0)
@@ -299,7 +312,7 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// create pipe
String sql =
String.format(
- "create pipe a2b with processor
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1',
'down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with processor
('processor'='tumbling-time-sampling-processor',
'processor.tumbling-time.interval-seconds'='1',
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
receiverDataNode.getIpAndPortString());
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
@@ -325,10 +338,11 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.**", "Time,root.db.d1.at1,",
expectedResSet);
- // alter pipe (modify 'down-sampling.interval-seconds')
+ // alter pipe (modify 'processor.tumbling-time.interval-seconds')
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
- statement.execute("alter pipe a2b modify processor
('down-sampling.interval-seconds'='2')");
+ statement.execute(
+ "alter pipe a2b modify processor
('processor.tumbling-time.interval-seconds'='2')");
} catch (SQLException e) {
fail(e.getMessage());
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index 62d30818657..fe12fab2bf1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -62,7 +62,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testDownSamplingProcessor() throws Exception {
+ public void testTumblingTimeSamplingProcessor() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
String receiverIp = receiverDataNode.getIp();
@@ -87,8 +87,8 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
extractorAttributes.put("source.realtime.mode", "log");
- processorAttributes.put("processor", "down-sampling-processor");
- processorAttributes.put("processor.down-sampling.interval-seconds",
"20");
+ processorAttributes.put("processor", "tumbling-time-sampling-processor");
+ processorAttributes.put("processor.tumbling-time.interval-seconds",
"20");
processorAttributes.put("processor.down-sampling.split-file", "true");
connectorAttributes.put("sink", "iotdb-thrift-sink");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 1cdbbb34221..ba49f55ff32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -23,7 +23,8 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.PipeProcessorConstructor;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -36,6 +37,10 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(),
DoNothingProcessor::new);
pluginConstructors.put(
- BuiltinPipePlugin.DOWN_SAMPLING_PROCESSOR.getPipePluginName(),
DownSamplingProcessor::new);
+ BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
+ TumblingTimeSamplingProcessor::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
+ SwingingDoorTrendingSamplingProcessor::new);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
new file mode 100644
index 00000000000..3dcbb43feee
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRemarkableRow.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.row;
+
+public class PipeRemarkableRow extends PipeRow {
+
+ private final boolean[] isNullMarked;
+
+ public PipeRemarkableRow(PipeRow pipeRow) {
+ super(
+ pipeRow.rowIndex,
+ pipeRow.deviceId,
+ pipeRow.isAligned,
+ pipeRow.measurementSchemaList,
+ pipeRow.timestampColumn,
+ pipeRow.valueColumnTypes,
+ pipeRow.valueColumns,
+ pipeRow.bitMaps,
+ pipeRow.columnNameStringList);
+
+ // Copy the isNullMarked array from the original row
+ // to avoid modifying the original row.
+ final int columnCount = pipeRow.measurementSchemaList.length;
+ isNullMarked = new boolean[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ isNullMarked[i] = super.isNull(i);
+ }
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return isNullMarked[index];
+ }
+
+ public void markNull(int index) {
+ isNullMarked[index] = true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 87f36db8191..eb872283862 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -33,18 +33,18 @@ import java.util.List;
public class PipeRow implements Row {
- private final int rowIndex;
+ protected final int rowIndex;
- private final String deviceId;
- private final boolean isAligned;
- private final MeasurementSchema[] measurementSchemaList;
+ protected final String deviceId;
+ protected final boolean isAligned;
+ protected final MeasurementSchema[] measurementSchemaList;
- private final long[] timestampColumn;
- private final TSDataType[] valueColumnTypes;
- private final Object[] valueColumns;
- private final BitMap[] bitMaps;
+ protected final long[] timestampColumn;
+ protected final TSDataType[] valueColumnTypes;
+ protected final Object[] valueColumns;
+ protected final BitMap[] bitMaps;
- private final String[] columnNameStringList;
+ protected final String[] columnNameStringList;
public PipeRow(
int rowIndex,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
index 8b7278d0e0a..225ce14efe4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeE
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -37,73 +36,63 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
-public class DownSamplingProcessor implements PipeProcessor {
+public abstract class DownSamplingProcessor implements PipeProcessor {
+
+ protected long memoryLimitInBytes;
- private static final Logger LOGGER =
LoggerFactory.getLogger(DownSamplingProcessor.class);
+ protected boolean shouldSplitFile;
- private String dataBaseNameWithPathSeparator;
- private long intervalInCurrentPrecision;
- private boolean shouldSplitFile;
+ protected String dataBaseNameWithPathSeparator;
- private PartialPathLastTimeCache partialPathLastTimeCache;
+ protected PartialPathLastObjectCache<?> pathLastObjectCache;
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- // No need to validate.
+ memoryLimitInBytes =
+ validator
+ .getParameters()
+ .getLongOrDefault(
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+
+ validator.validate(
+ memoryLimitInBytes -> (Long) memoryLimitInBytes > 0,
+ String.format(
+ "%s must be > 0, but got %s",
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
memoryLimitInBytes),
+ memoryLimitInBytes);
}
@Override
- public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
- throws Exception {
- final String dataBaseName =
- StorageEngine.getInstance()
- .getDataRegion(
- new DataRegionId(
- ((PipeTaskProcessorRuntimeEnvironment)
configuration.getRuntimeEnvironment())
- .getRegionId()))
- .getDatabaseName();
- final long intervalSeconds =
- parameters.getLongOrDefault(
- PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
- PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE);
- final long memoryLimitInBytes =
- parameters.getLongOrDefault(
- PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
- PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
shouldSplitFile =
parameters.getBooleanOrDefault(
PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE);
- LOGGER.info(
- "DownSamplingProcessor in {} is initialized with {}: {}s, {}: {}, {}:
{}.",
- dataBaseName,
- PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
- intervalSeconds,
- PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
- memoryLimitInBytes,
- PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
- shouldSplitFile);
-
- dataBaseNameWithPathSeparator = dataBaseName +
TsFileConstant.PATH_SEPARATOR;
- intervalInCurrentPrecision =
- TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds,
TimeUnit.SECONDS);
-
- partialPathLastTimeCache = new
PartialPathLastTimeCache(memoryLimitInBytes);
+
+ dataBaseNameWithPathSeparator =
+ StorageEngine.getInstance()
+ .getDataRegion(
+ new DataRegionId(
+ ((PipeTaskProcessorRuntimeEnvironment)
+ configuration.getRuntimeEnvironment())
+ .getRegionId()))
+ .getDatabaseName()
+ + TsFileConstant.PATH_SEPARATOR;
+
+ pathLastObjectCache = initPathLastObjectCache(memoryLimitInBytes);
}
+ protected abstract PartialPathLastObjectCache<?>
initPathLastObjectCache(long memoryLimitInBytes);
+
@Override
public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
throws Exception {
@@ -122,7 +111,8 @@ public class DownSamplingProcessor implements PipeProcessor
{
// To reduce the memory usage, we use the device suffix
// instead of the full path as the key.
if (deviceSuffix.get() == null) {
-
deviceSuffix.set(row.getDeviceId().replaceFirst(dataBaseNameWithPathSeparator,
""));
+ deviceSuffix.set(
+
row.getDeviceId().replaceFirst(this.dataBaseNameWithPathSeparator, ""));
}
processRow(row, rowCollector, deviceSuffix.get(), exception);
@@ -141,41 +131,11 @@ public class DownSamplingProcessor implements
PipeProcessor {
}
}
- private void processRow(
+ protected abstract void processRow(
Row row,
RowCollector rowCollector,
String deviceSuffix,
- AtomicReference<Exception> exception) {
- boolean hasNonNullMeasurements = false;
-
- for (int index = 0, size = row.size(); index < size; ++index) {
- if (row.isNull(index)) {
- continue;
- }
-
- final String timeSeriesSuffix =
- deviceSuffix + TsFileConstant.PATH_SEPARATOR +
row.getColumnName(index);
- final Long lastSampleTime =
partialPathLastTimeCache.getPartialPathLastTime(timeSeriesSuffix);
-
- if (lastSampleTime != null) {
- if (Math.abs(row.getTime() - lastSampleTime) >=
intervalInCurrentPrecision) {
- hasNonNullMeasurements = true;
- partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix,
row.getTime());
- }
- } else {
- hasNonNullMeasurements = true;
- partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix,
row.getTime());
- }
- }
-
- if (hasNonNullMeasurements) {
- try {
- rowCollector.collectRow(row);
- } catch (Exception e) {
- exception.set(e);
- }
- }
- }
+ AtomicReference<Exception> exception);
/**
* If data comes in {@link TsFileInsertionEvent}, we will not split it into
{@link
@@ -207,8 +167,8 @@ public class DownSamplingProcessor implements PipeProcessor
{
@Override
public void close() throws Exception {
- if (partialPathLastTimeCache != null) {
- partialPathLastTimeCache.close();
+ if (pathLastObjectCache != null) {
+ pathLastObjectCache.close();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
similarity index 77%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index 226dbe361f5..bbee9babed9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -30,21 +30,17 @@ import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Map-like component to look up for the last chosen time of a timeSeries. It
has max size and
- * timeSeries may fail to find its last time and must design the logic to
handle this.
- */
-public class PartialPathLastTimeCache implements AutoCloseable {
+public abstract class PartialPathLastObjectCache<T> implements AutoCloseable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PartialPathLastTimeCache.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartialPathLastObjectCache.class);
private final PipeMemoryBlock allocatedMemoryBlock;
// Used to adjust the memory usage of the cache
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
- private final Cache<String, Long> partialPath2TimeCache;
+ private final Cache<String, T> partialPath2ObjectCache;
- public PartialPathLastTimeCache(long memoryLimitInBytes) {
+ public PartialPathLastObjectCache(long memoryLimitInBytes) {
allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(memoryLimitInBytes)
@@ -54,7 +50,7 @@ public class PartialPathLastTimeCache implements
AutoCloseable {
memoryUsageCheatFactor.set(
memoryUsageCheatFactor.get() * ((double) oldMemory /
newMemory));
LOGGER.info(
- "PartialPathLastTimeCache.allocatedMemoryBlock has
shrunk from {} to {}.",
+ "PartialPathLastObjectCache.allocatedMemoryBlock has
shrunk from {} to {}.",
oldMemory,
newMemory);
})
@@ -64,23 +60,23 @@ public class PartialPathLastTimeCache implements
AutoCloseable {
memoryUsageCheatFactor.set(
memoryUsageCheatFactor.get() / ((double) newMemory /
oldMemory));
LOGGER.info(
- "PartialPathLastTimeCache.allocatedMemoryBlock has
expanded from {} to {}.",
+ "PartialPathLastObjectCache.allocatedMemoryBlock has
expanded from {} to {}.",
oldMemory,
newMemory);
});
// Currently disable the metric here because it's not a constant cache and
the number may
// fluctuate. In the future all the "processorCache"s may be recorded in
single metric entry
- partialPath2TimeCache =
+ partialPath2ObjectCache =
Caffeine.newBuilder()
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
.weigher(
- // Here partial path is a part of full path adequate to
inspect the last time
- (Weigher<String, Long>)
- (partialPath, timeStamp) -> {
+ // Here partial path is a part of full path adequate to
inspect the last object
+ (Weigher<String, T>)
+ (partialPath, object) -> {
final long weightInLong =
(long)
- ((MemUtils.getStringMem(partialPath) +
Long.BYTES)
+ ((MemUtils.getStringMem(partialPath) +
calculateMemoryUsage(object))
* memoryUsageCheatFactor.get());
if (weightInLong <= 0) {
return Integer.MAX_VALUE;
@@ -91,21 +87,23 @@ public class PartialPathLastTimeCache implements
AutoCloseable {
.build();
}
+ protected abstract long calculateMemoryUsage(T object);
+
/////////////////////////// Getter & Setter ///////////////////////////
- public Long getPartialPathLastTime(String partialPath) {
- return partialPath2TimeCache.getIfPresent(partialPath);
+ public T getPartialPathLastObject(String partialPath) {
+ return partialPath2ObjectCache.getIfPresent(partialPath);
}
- public void setPartialPathLastTime(String partialPath, long timeStamp) {
- partialPath2TimeCache.put(partialPath, timeStamp);
+ public void setPartialPathLastObject(String partialPath, T object) {
+ partialPath2ObjectCache.put(partialPath, object);
}
/////////////////////////// Close ///////////////////////////
@Override
public void close() throws Exception {
- partialPath2TimeCache.invalidateAll();
+ partialPath2ObjectCache.invalidateAll();
allocatedMemoryBlock.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
new file mode 100644
index 00000000000..f80b189427a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import java.util.Objects;
+
+public class SwingingDoorTrendingFilter<T> {
+
+ private final SwingingDoorTrendingSamplingProcessor processor;
+
+ /**
+ * the maximum curUpperSlope between the lastStoredPoint to the current
point upperDoor can only
+ * open up
+ */
+ private double upperDoor;
+ /**
+ * the minimum curLowerSlope between the lastStoredPoint to the current
point lowerDoor can only
+ * open downward
+ */
+ private double lowerDoor;
+
+ /**
+ * the last read time and value if upperDoor >= lowerDoor meaning out of
compressionDeviation
+ * range, will store lastReadTimestamp and lastReadValue
+ */
+ private long lastReadTimestamp;
+
+ private T lastReadValue;
+
+ /**
+ * the last stored time and value we compare current point against
lastReadTimestamp and
+ * lastReadValue
+ */
+ private long lastStoredTimestamp;
+
+ private T lastStoredValue;
+
+ public SwingingDoorTrendingFilter(
+ SwingingDoorTrendingSamplingProcessor processor, long firstTimestamp, T
firstValue) {
+ this.processor = processor;
+ init(firstTimestamp, firstValue);
+ }
+
+ private void init(long firstTimestamp, T firstValue) {
+ upperDoor = Double.MIN_VALUE;
+ lowerDoor = Double.MAX_VALUE;
+
+ lastReadTimestamp = firstTimestamp;
+ lastReadValue = firstValue;
+
+ lastStoredTimestamp = firstTimestamp;
+ lastStoredValue = firstValue;
+ }
+
+ public boolean filter(long timestamp, T value) {
+ try {
+ return tryFilter(timestamp, value);
+ } catch (Exception e) {
+ init(timestamp, value);
+ return true;
+ }
+ }
+
+ private boolean tryFilter(long timestamp, T value) {
+ final long timeDiff = timestamp - lastStoredTimestamp;
+ final long absTimeDiff = Math.abs(timeDiff);
+
+ if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
+ return false;
+ }
+
+ if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
+ reset(timestamp, value);
+ return true;
+ }
+
+ // For boolean and string type, we only compare the value
+ if (value instanceof Boolean || value instanceof String) {
+ if (Objects.equals(lastStoredValue, value)) {
+ return false;
+ }
+
+ reset(timestamp, value);
+ return true;
+ }
+
+ // For other numerical types, we compare the value and the time difference
+ final double doubleValue = Double.parseDouble(value.toString());
+ final double lastStoredDoubleValue =
Double.parseDouble(lastStoredValue.toString());
+ final double valueDiff = doubleValue - lastStoredDoubleValue;
+
+ final double currentUpperSlope = (valueDiff -
processor.getCompressionDeviation()) / timeDiff;
+ if (currentUpperSlope > upperDoor) {
+ upperDoor = currentUpperSlope;
+ }
+
+ final double currentLowerSlope = (valueDiff +
processor.getCompressionDeviation()) / timeDiff;
+ if (currentLowerSlope < lowerDoor) {
+ lowerDoor = currentLowerSlope;
+ }
+
+ if (upperDoor > lowerDoor) {
+ lastStoredTimestamp = lastReadTimestamp;
+ lastStoredValue = lastReadValue;
+
+ upperDoor = currentUpperSlope;
+ lowerDoor = currentLowerSlope;
+
+ lastReadValue = value;
+ lastReadTimestamp = timestamp;
+
+ return true;
+ }
+
+ lastReadValue = value;
+ lastReadTimestamp = timestamp;
+
+ return false;
+ }
+
+ private void reset(long timestamp, T value) {
+ upperDoor = Double.MIN_VALUE;
+ lowerDoor = Double.MAX_VALUE;
+
+ lastStoredTimestamp = timestamp;
+ lastStoredValue = value;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
new file mode 100644
index 00000000000..ebfde47671e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingSamplingProcessor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SwingingDoorTrendingSamplingProcessor extends
DownSamplingProcessor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SwingingDoorTrendingSamplingProcessor.class);
+
+ /**
+ * the maximum absolute difference the user set if the data's value is within
+ * compressionDeviation, it will be compressed and discarded after
compression, it will only store
+ * out of range (time, data) to form the trend
+ */
+ private double compressionDeviation;
+
+ /**
+ * the minimum time distance between two stored data points if current point
time to the last
+ * stored point time distance <= compressionMinTimeInterval, current point
will NOT be stored
+ * regardless of compression deviation
+ */
+ private long compressionMinTimeInterval;
+
+ /**
+ * the maximum time distance between two stored data points if current point
time to the last
+ * stored point time distance >= compressionMaxTimeInterval, current point
will be stored
+ * regardless of compression deviation
+ */
+ private long compressionMaxTimeInterval;
+
+ private PartialPathLastObjectCache<SwingingDoorTrendingFilter<?>>
pathLastObjectCache;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ super.validate(validator);
+
+ final PipeParameters parameters = validator.getParameters();
+ compressionDeviation =
+ parameters.getDoubleOrDefault(
+ PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+
PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE);
+ compressionMinTimeInterval =
+ parameters.getLongOrDefault(
+ PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE);
+ compressionMaxTimeInterval =
+ parameters.getLongOrDefault(
+ PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE);
+
+ validator
+ .validate(
+ compressionDeviation -> (Double) compressionDeviation >= 0,
+ String.format(
+ "%s must be >= 0, but got %s",
+ PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+ compressionDeviation),
+ compressionDeviation)
+ .validate(
+ compressionMinTimeInterval -> (Long) compressionMinTimeInterval >=
0,
+ String.format(
+ "%s must be >= 0, but got %s",
+ PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval),
+ compressionMinTimeInterval)
+ .validate(
+ compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >=
0,
+ String.format(
+ "%s must be >= 0, but got %s",
+ PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+ compressionMaxTimeInterval),
+ compressionMaxTimeInterval)
+ .validate(
+ minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1],
+ String.format(
+ "%s must be <= %s, but got %s and %s",
+ PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+ PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval,
+ compressionMaxTimeInterval),
+ compressionMinTimeInterval,
+ compressionMaxTimeInterval);
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
+ super.customize(parameters, configuration);
+
+ LOGGER.info(
+ "SwingingDoorTrendingSamplingProcessor in {} is initialized with {}:
{}, {}: {}, {}: {}.",
+ dataBaseNameWithPathSeparator,
+ PipeProcessorConstant.PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY,
+ compressionDeviation,
+ PipeProcessorConstant.PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval,
+ PipeProcessorConstant.PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY,
+ compressionMaxTimeInterval);
+ }
+
+ @Override
+ protected PartialPathLastObjectCache<?> initPathLastObjectCache(long
memoryLimitInBytes) {
+ pathLastObjectCache =
+ new
PartialPathLastObjectCache<SwingingDoorTrendingFilter<?>>(memoryLimitInBytes) {
+ @Override
+ protected long calculateMemoryUsage(SwingingDoorTrendingFilter<?>
object) {
+ return 64; // Long.BYTES * 8
+ }
+ };
+ return pathLastObjectCache;
+ }
+
+ @Override
+ protected void processRow(
+ Row row,
+ RowCollector rowCollector,
+ String deviceSuffix,
+ AtomicReference<Exception> exception) {
+ final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow)
row);
+
+ boolean hasNonNullMeasurements = false;
+ for (int i = 0, size = row.size(); i < size; i++) {
+ if (row.isNull(i)) {
+ continue;
+ }
+
+ final String timeSeriesSuffix =
+ deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i);
+ final SwingingDoorTrendingFilter filter =
+ pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+ if (filter != null) {
+ if (filter.filter(row.getTime(), row.getObject(i))) {
+ hasNonNullMeasurements = true;
+ } else {
+ remarkableRow.markNull(i);
+ }
+ } else {
+ hasNonNullMeasurements = true;
+ pathLastObjectCache.setPartialPathLastObject(
+ timeSeriesSuffix,
+ new SwingingDoorTrendingFilter<>(this, row.getTime(),
row.getObject(i)));
+ }
+ }
+
+ if (hasNonNullMeasurements) {
+ try {
+ rowCollector.collectRow(remarkableRow);
+ } catch (IOException e) {
+ exception.set(e);
+ }
+ }
+ }
+
+ double getCompressionDeviation() {
+ return compressionDeviation;
+ }
+
+ long getCompressionMinTimeInterval() {
+ return compressionMinTimeInterval;
+ }
+
+ long getCompressionMaxTimeInterval() {
+ return compressionMaxTimeInterval;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
new file mode 100644
index 00000000000..3ce009d21d9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.tumbling;
+
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY;
+
+public class TumblingTimeSamplingProcessor extends DownSamplingProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TumblingTimeSamplingProcessor.class);
+
+ private long intervalInCurrentPrecision;
+
+ private PartialPathLastObjectCache<Long> pathLastObjectCache;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ super.validate(validator);
+
+ final long intervalSeconds =
+ validator
+ .getParameters()
+ .getLongOrDefault(
+ PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY,
+ PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE);
+ validator.validate(
+ seconds -> (Long) seconds > 0,
+ String.format(
+ "The value of %s must be greater than 0, but got %d.",
+ PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY, intervalSeconds),
+ intervalSeconds);
+ intervalInCurrentPrecision =
+ TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds,
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
+ super.customize(parameters, configuration);
+
+ LOGGER.info(
+ "TumblingTimeSamplingProcessor in {} is initialized with {}: {}s, {}:
{}, {}: {}.",
+ dataBaseNameWithPathSeparator,
+ PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY,
+ intervalInCurrentPrecision,
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+ memoryLimitInBytes,
+ PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+ shouldSplitFile);
+ }
+
+ @Override
+ protected PartialPathLastObjectCache<?> initPathLastObjectCache(long
memoryLimitInBytes) {
+ pathLastObjectCache =
+ new PartialPathLastObjectCache<Long>(memoryLimitInBytes) {
+ @Override
+ protected long calculateMemoryUsage(Long object) {
+ return Long.BYTES;
+ }
+ };
+ return pathLastObjectCache;
+ }
+
+ @Override
+ protected void processRow(
+ Row row,
+ RowCollector rowCollector,
+ String deviceSuffix,
+ AtomicReference<Exception> exception) {
+ for (int index = 0, size = row.size(); index < size; ++index) {
+ if (row.isNull(index)) {
+ continue;
+ }
+
+ final String timeSeriesSuffix =
+ deviceSuffix + TsFileConstant.PATH_SEPARATOR +
row.getColumnName(index);
+ final long currentRowTime = row.getTime();
+ final Long lastSampleTime =
pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+ if (lastSampleTime == null
+ || Math.abs(currentRowTime - lastSampleTime) >=
intervalInCurrentPrecision) {
+ try {
+ rowCollector.collectRow(row);
+
+ pathLastObjectCache.setPartialPathLastObject(timeSeriesSuffix,
currentRowTime);
+ for (int j = index + 1; j < size; ++j) {
+ if (!row.isNull(j)) {
+ pathLastObjectCache.setPartialPathLastObject(
+ deviceSuffix + TsFileConstant.PATH_SEPARATOR +
row.getColumnName(j),
+ currentRowTime);
+ }
+ }
+ return;
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index d5e58abea12..4811f342b0e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -25,9 +25,6 @@ public class PipeProcessorConstant {
public static final String PROCESSOR_KEY = "processor";
- public static final String PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY =
- "processor.down-sampling.interval-seconds";
- public static final long
PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
public static final String PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY =
"processor.down-sampling.split-file";
public static final boolean PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE
= false;
@@ -35,6 +32,20 @@ public class PipeProcessorConstant {
"processor.down-sampling.memory-limit-in-bytes";
public static final long
PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB;
+ public static final String PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY =
+ "processor.tumbling-time.interval-seconds";
+ public static final long
PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
+
+ public static final String PROCESSOR_SDT_COMPRESSION_DEVIATION_KEY =
+ "processor.sdt.compression-deviation";
+ public static final double PROCESSOR_SDT_COMPRESSION_DEVIATION_DEFAULT_VALUE
= 0;
+ public static final String PROCESSOR_SDT_MIN_TIME_INTERVAL_KEY =
+ "processor.sdt.min-time-interval";
+ public static final long PROCESSOR_SDT_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0;
+ public static final String PROCESSOR_SDT_MAX_TIME_INTERVAL_KEY =
+ "processor.sdt.max-time-interval";
+ public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;
+
private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index cb55d5c5bc6..c1ae3a75f2b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -32,7 +32,8 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.writeback.WriteBac
import
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.donothing.DoNothingExtractor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
-import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import java.util.Arrays;
import java.util.Collections;
@@ -50,7 +51,9 @@ public enum BuiltinPipePlugin {
// processors
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
- DOWN_SAMPLING_PROCESSOR("down-sampling-processor",
DownSamplingProcessor.class),
+ TUMBLING_TIME_SAMPLING_PROCESSOR(
+ "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
+ SDT_SAMPLING_PROCESSOR("sdt-sampling-processor",
SwingingDoorTrendingSamplingProcessor.class),
// connectors
DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -108,7 +111,8 @@ public enum BuiltinPipePlugin {
// Sources
DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(),
// Processors
- DOWN_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+ SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
similarity index 75%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
index 9742fadf38e..dd4df672105 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/SwingingDoorTrendingSamplingProcessor.java
@@ -22,9 +22,9 @@ package
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
/**
- * This class is a placeholder and should not be initialized. It represents
the Down Sampling
- * processor. There is a real implementation in the server module but cannot
be imported here. The
- * pipe agent in the server module will replace this class with the real
implementation when
- * initializing the Down Sampling processor.
+ * This class is a placeholder and should not be initialized. It represents the
+ * sdt-sampling-processor. There is a real implementation in the server module
but cannot be
+ * imported here. The pipe agent in the server module will replace this class
with the real
+ * implementation when initializing the sdt-sampling-processor.
*/
-public class DownSamplingProcessor extends PlaceHolderProcessor {}
+public class SwingingDoorTrendingSamplingProcessor extends
PlaceHolderProcessor {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
similarity index 74%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
index 9742fadf38e..e57df60d7b1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/DownSamplingProcessor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/TumblingTimeSamplingProcessor.java
@@ -22,9 +22,9 @@ package
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
/**
- * This class is a placeholder and should not be initialized. It represents
the Down Sampling
- * processor. There is a real implementation in the server module but cannot
be imported here. The
- * pipe agent in the server module will replace this class with the real
implementation when
- * initializing the Down Sampling processor.
+ * This class is a placeholder and should not be initialized. It represents the
+ * tumbling-time-sampling-processor. There is a real implementation in the
server module but cannot
+ * be imported here. The pipe agent in the server module will replace this
class with the real
+ * implementation when initializing the tumbling-time-sampling-processor.
*/
-public class DownSamplingProcessor extends PlaceHolderProcessor {}
+public class TumblingTimeSamplingProcessor extends PlaceHolderProcessor {}