This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d564e33618 Pipe: implemented ChangingValueSamplingProcessor (#12466)
9d564e33618 is described below
commit 9d564e336183eac5bae5ef0f8281525430add690
Author: ppppoooo <[email protected]>
AuthorDate: Mon May 20 19:38:07 2024 +0800
Pipe: implemented ChangingValueSamplingProcessor (#12466)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../PipeDataRegionProcessorConstructor.java | 4 +
.../downsampling/changing/ChangingValueFilter.java | 95 ++++++++++
.../changing/ChangingValueSamplingProcessor.java | 199 +++++++++++++++++++++
.../sdt/SwingingDoorTrendingFilter.java | 4 +-
.../config/constant/PipeProcessorConstant.java | 11 ++
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 4 +
.../ChangingValueSamplingProcessor.java | 30 ++++
7 files changed, 346 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 15578f2fc2a..6d747d156a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
import
org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
import
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
import
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
@@ -47,6 +48,9 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
SwingingDoorTrendingSamplingProcessor::new);
+ pluginConstructors.put(
+
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
+ ChangingValueSamplingProcessor::new);
pluginConstructors.put(
BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
ThrowingExceptionProcessor::new);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
new file mode 100644
index 00000000000..cc83fbeb21e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.apache.iotdb.pipe.api.type.Binary;
+
+import java.util.Objects;
+
+public class ChangingValueFilter<T> {
+
+ private final ChangingValueSamplingProcessor processor;
+
+ /**
+ * The last stored time and value we compare current point against
lastReadTimestamp and
+ * lastReadValue
+ */
+ private long lastStoredTimestamp;
+
+ private T lastStoredValue;
+
+ public ChangingValueFilter(
+ ChangingValueSamplingProcessor processor, long firstTimestamp, T
firstValue) {
+ this.processor = processor;
+ init(firstTimestamp, firstValue);
+ }
+
+ private void init(long firstTimestamp, T firstValue) {
+ lastStoredTimestamp = firstTimestamp;
+ lastStoredValue = firstValue;
+ }
+
+ public boolean filter(long timestamp, T value) {
+ try {
+ return tryFilter(timestamp, value);
+ } catch (Exception e) {
+ init(timestamp, value);
+ return true;
+ }
+ }
+
+ private boolean tryFilter(long timestamp, T value) {
+ final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);
+
+ if (timeDiff <= processor.getCompressionMinTimeInterval()) {
+ return false;
+ }
+
+ if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
+ reset(timestamp, value);
+ return true;
+ }
+
+ // For boolean and string type, we only compare the value
+ if (value instanceof Boolean || value instanceof String || value
instanceof Binary) {
+ if (Objects.equals(lastStoredValue, value)) {
+ return false;
+ }
+
+ reset(timestamp, value);
+ return true;
+ }
+
+ // For other numerical types, we compare the value difference
+ if (Math.abs(
+ Double.parseDouble(lastStoredValue.toString()) -
Double.parseDouble(value.toString()))
+ > processor.getCompressionDeviation()) {
+ reset(timestamp, value);
+ return true;
+ }
+
+ return false;
+ }
+
+ private void reset(long timestamp, T value) {
+ lastStoredTimestamp = timestamp;
+ lastStoredValue = value;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
new file mode 100644
index 00000000000..6badb70755b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ChangingValueSamplingProcessor extends DownSamplingProcessor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ChangingValueSamplingProcessor.class);
+
+ /**
+ * The maximum absolute difference the user set if the data's value is within
+ * compressionDeviation, it will be compressed and discarded after
compression
+ */
+ private double compressionDeviation;
+
+ /**
+ * The minimum time distance between two stored data points if current point
time to the last
+ * stored point time distance <= compressionMinTimeInterval, current point
will NOT be stored
+ * regardless of compression deviation
+ */
+ private long compressionMinTimeInterval;
+
+ /**
+ * The maximum time distance between two stored data points if current point
time to the last
+ * stored point time distance >= compressionMaxTimeInterval, current point
will be stored
+ * regardless of compression deviation
+ */
+ private long compressionMaxTimeInterval;
+
+ private PartialPathLastObjectCache<ChangingValueFilter<?>>
pathLastObjectCache;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ super.validate(validator);
+
+ final PipeParameters parameters = validator.getParameters();
+ compressionDeviation =
+ parameters.getDoubleOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE);
+ compressionMinTimeInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE);
+ compressionMaxTimeInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE);
+
+ validator
+ .validate(
+ compressionDeviation -> (Double) compressionDeviation >= 0,
+ String.format(
+ "%s must be >= 0, but got %s",
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+ compressionDeviation),
+ compressionDeviation)
+ .validate(
+ compressionMinTimeInterval -> (Long) compressionMinTimeInterval >=
0,
+ String.format(
+ "%s must be >= 0, but got %s",
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval),
+ compressionMinTimeInterval)
+ .validate(
+ compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >=
0,
+ String.format(
+ "%s must be >= 0, but got %s",
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+ compressionMaxTimeInterval),
+ compressionMaxTimeInterval)
+ .validate(
+ minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1],
+ String.format(
+ "%s must be <= %s, but got %s and %s",
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval,
+ compressionMaxTimeInterval),
+ compressionMinTimeInterval,
+ compressionMaxTimeInterval);
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
+ super.customize(parameters, configuration);
+
+ LOGGER.info(
+ "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}:
{}, {}: {}.",
+ dataBaseNameWithPathSeparator,
+ PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+ compressionDeviation,
+ PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+ compressionMinTimeInterval,
+ PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+ compressionMaxTimeInterval);
+ }
+
+ @Override
+ protected PartialPathLastObjectCache<?> initPathLastObjectCache(long
memoryLimitInBytes) {
+ pathLastObjectCache =
+ new
PartialPathLastObjectCache<ChangingValueFilter<?>>(memoryLimitInBytes) {
+ @Override
+ protected long calculateMemoryUsage(ChangingValueFilter<?> object) {
+ return 64; // Long.BYTES * 8
+ }
+ };
+ return pathLastObjectCache;
+ }
+
+ @Override
+ protected void processRow(
+ Row row,
+ RowCollector rowCollector,
+ String deviceSuffix,
+ AtomicReference<Exception> exception) {
+ final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow)
row);
+
+ boolean hasNonNullMeasurements = false;
+ for (int i = 0, size = row.size(); i < size; i++) {
+ if (row.isNull(i)) {
+ continue;
+ }
+
+ final String timeSeriesSuffix =
+ deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i);
+ final ChangingValueFilter filter =
+ pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+ if (filter != null) {
+ if (filter.filter(row.getTime(), row.getObject(i))) {
+ hasNonNullMeasurements = true;
+ } else {
+ remarkableRow.markNull(i);
+ }
+ } else {
+ hasNonNullMeasurements = true;
+ pathLastObjectCache.setPartialPathLastObject(
+ timeSeriesSuffix, new ChangingValueFilter<>(this, row.getTime(),
row.getObject(i)));
+ }
+ }
+
+ if (hasNonNullMeasurements) {
+ try {
+ rowCollector.collectRow(remarkableRow);
+ } catch (IOException e) {
+ exception.set(e);
+ }
+ }
+ }
+
+ double getCompressionDeviation() {
+ return compressionDeviation;
+ }
+
+ long getCompressionMinTimeInterval() {
+ return compressionMinTimeInterval;
+ }
+
+ long getCompressionMaxTimeInterval() {
+ return compressionMaxTimeInterval;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
index cdbae8fef59..8d9e73deb07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+import org.apache.iotdb.pipe.api.type.Binary;
+
import java.util.Objects;
public class SwingingDoorTrendingFilter<T> {
@@ -93,7 +95,7 @@ public class SwingingDoorTrendingFilter<T> {
}
// For boolean and string type, we only compare the value
- if (value instanceof Boolean || value instanceof String) {
+ if (value instanceof Boolean || value instanceof String || value
instanceof Binary) {
if (Objects.equals(lastStoredValue, value)) {
return false;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 49fa2e2cab8..22bc87b2917 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,17 @@ public class PipeProcessorConstant {
"processor.sdt.max-time-interval";
public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;
+ public static final String PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION =
+ "processor.changing-value.compression-deviation";
+ public static final double
PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE = 0;
+ public static final String PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY =
+ "processor.changing-value.min-time-interval";
+ public static final long
PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0;
+ public static final String PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY =
+ "processor.changing-value.max-time-interval";
+ public static final long
PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE =
+ Long.MAX_VALUE;
+
public static final String PROCESSOR_OUTPUT_SERIES_KEY =
"processor.output-series";
private PipeProcessorConstant() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 511282ff329..a6ae2e9fd38 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.Aggregat
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
@@ -59,6 +60,8 @@ public enum BuiltinPipePlugin {
TUMBLING_TIME_SAMPLING_PROCESSOR(
"tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
SDT_SAMPLING_PROCESSOR("sdt-sampling-processor",
SwingingDoorTrendingSamplingProcessor.class),
+ CHANGING_VALUE_SAMPLING_PROCESSOR(
+ "changing-value-sampling-processor",
ChangingValueSamplingProcessor.class),
THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor",
ThrowingExceptionProcessor.class),
AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
@@ -126,6 +129,7 @@ public enum BuiltinPipePlugin {
// Processors
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+
CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
new file mode 100644
index 00000000000..a9b060ff6ba
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
+
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * changing-value-sampling-processor. There is a real implementation in the
server module but cannot
+ * be imported here. The pipe agent in the server module will replace this
class with the real
+ * implementation when initializing the changing-value-sampling-processor.
+ */
+public class ChangingValueSamplingProcessor extends PlaceHolderProcessor {}