This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d564e33618 Pipe: implemented ChangingValueSamplingProcessor (#12466)
9d564e33618 is described below

commit 9d564e336183eac5bae5ef0f8281525430add690
Author: ppppoooo <[email protected]>
AuthorDate: Mon May 20 19:38:07 2024 +0800

    Pipe: implemented ChangingValueSamplingProcessor (#12466)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../PipeDataRegionProcessorConstructor.java        |   4 +
 .../downsampling/changing/ChangingValueFilter.java |  95 ++++++++++
 .../changing/ChangingValueSamplingProcessor.java   | 199 +++++++++++++++++++++
 .../sdt/SwingingDoorTrendingFilter.java            |   4 +-
 .../config/constant/PipeProcessorConstant.java     |  11 ++
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   4 +
 .../ChangingValueSamplingProcessor.java            |  30 ++++
 7 files changed, 346 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 15578f2fc2a..6d747d156a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
 import 
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
 import 
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
 import 
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
@@ -47,6 +48,9 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
         SwingingDoorTrendingSamplingProcessor::new);
+    pluginConstructors.put(
+        
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
+        ChangingValueSamplingProcessor::new);
     pluginConstructors.put(
         BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
         ThrowingExceptionProcessor::new);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
new file mode 100644
index 00000000000..cc83fbeb21e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.apache.iotdb.pipe.api.type.Binary;
+
+import java.util.Objects;
+
+public class ChangingValueFilter<T> {
+
+  private final ChangingValueSamplingProcessor processor;
+
+  /**
+   * The last stored time and value we compare current point against 
lastReadTimestamp and
+   * lastReadValue
+   */
+  private long lastStoredTimestamp;
+
+  private T lastStoredValue;
+
+  public ChangingValueFilter(
+      ChangingValueSamplingProcessor processor, long firstTimestamp, T 
firstValue) {
+    this.processor = processor;
+    init(firstTimestamp, firstValue);
+  }
+
+  private void init(long firstTimestamp, T firstValue) {
+    lastStoredTimestamp = firstTimestamp;
+    lastStoredValue = firstValue;
+  }
+
+  public boolean filter(long timestamp, T value) {
+    try {
+      return tryFilter(timestamp, value);
+    } catch (Exception e) {
+      init(timestamp, value);
+      return true;
+    }
+  }
+
+  private boolean tryFilter(long timestamp, T value) {
+    final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);
+
+    if (timeDiff <= processor.getCompressionMinTimeInterval()) {
+      return false;
+    }
+
+    if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
+      reset(timestamp, value);
+      return true;
+    }
+
+    // For boolean and string type, we only compare the value
+    if (value instanceof Boolean || value instanceof String || value 
instanceof Binary) {
+      if (Objects.equals(lastStoredValue, value)) {
+        return false;
+      }
+
+      reset(timestamp, value);
+      return true;
+    }
+
+    // For other numerical types, we compare the value difference
+    if (Math.abs(
+            Double.parseDouble(lastStoredValue.toString()) - 
Double.parseDouble(value.toString()))
+        > processor.getCompressionDeviation()) {
+      reset(timestamp, value);
+      return true;
+    }
+
+    return false;
+  }
+
+  private void reset(long timestamp, T value) {
+    lastStoredTimestamp = timestamp;
+    lastStoredValue = value;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
new file mode 100644
index 00000000000..6badb70755b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueSamplingProcessor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import 
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ChangingValueSamplingProcessor extends DownSamplingProcessor {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ChangingValueSamplingProcessor.class);
+
+  /**
+   * The maximum absolute difference the user set if the data's value is within
+   * compressionDeviation, it will be compressed and discarded after 
compression
+   */
+  private double compressionDeviation;
+
+  /**
+   * The minimum time distance between two stored data points if current point 
time to the last
+   * stored point time distance <= compressionMinTimeInterval, current point 
will NOT be stored
+   * regardless of compression deviation
+   */
+  private long compressionMinTimeInterval;
+
+  /**
+   * The maximum time distance between two stored data points if current point 
time to the last
+   * stored point time distance >= compressionMaxTimeInterval, current point 
will be stored
+   * regardless of compression deviation
+   */
+  private long compressionMaxTimeInterval;
+
+  private PartialPathLastObjectCache<ChangingValueFilter<?>> 
pathLastObjectCache;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    super.validate(validator);
+
+    final PipeParameters parameters = validator.getParameters();
+    compressionDeviation =
+        parameters.getDoubleOrDefault(
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE);
+    compressionMinTimeInterval =
+        parameters.getLongOrDefault(
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE);
+    compressionMaxTimeInterval =
+        parameters.getLongOrDefault(
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+            
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE);
+
+    validator
+        .validate(
+            compressionDeviation -> (Double) compressionDeviation >= 0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+                compressionDeviation),
+            compressionDeviation)
+        .validate(
+            compressionMinTimeInterval -> (Long) compressionMinTimeInterval >= 
0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+                compressionMinTimeInterval),
+            compressionMinTimeInterval)
+        .validate(
+            compressionMaxTimeInterval -> (Long) compressionMaxTimeInterval >= 
0,
+            String.format(
+                "%s must be >= 0, but got %s",
+                
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+                compressionMaxTimeInterval),
+            compressionMaxTimeInterval)
+        .validate(
+            minMaxPair -> (Long) minMaxPair[0] <= (Long) minMaxPair[1],
+            String.format(
+                "%s must be <= %s, but got %s and %s",
+                
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+                
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+                compressionMinTimeInterval,
+                compressionMaxTimeInterval),
+            compressionMinTimeInterval,
+            compressionMaxTimeInterval);
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration 
configuration) {
+    super.customize(parameters, configuration);
+
+    LOGGER.info(
+        "ChangingValueSamplingProcessor in {} is initialized with {}: {}, {}: 
{}, {}: {}.",
+        dataBaseNameWithPathSeparator,
+        PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+        compressionDeviation,
+        PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+        compressionMinTimeInterval,
+        PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+        compressionMaxTimeInterval);
+  }
+
+  @Override
+  protected PartialPathLastObjectCache<?> initPathLastObjectCache(long 
memoryLimitInBytes) {
+    pathLastObjectCache =
+        new 
PartialPathLastObjectCache<ChangingValueFilter<?>>(memoryLimitInBytes) {
+          @Override
+          protected long calculateMemoryUsage(ChangingValueFilter<?> object) {
+            return 64; // Long.BYTES * 8
+          }
+        };
+    return pathLastObjectCache;
+  }
+
+  @Override
+  protected void processRow(
+      Row row,
+      RowCollector rowCollector,
+      String deviceSuffix,
+      AtomicReference<Exception> exception) {
+    final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) 
row);
+
+    boolean hasNonNullMeasurements = false;
+    for (int i = 0, size = row.size(); i < size; i++) {
+      if (row.isNull(i)) {
+        continue;
+      }
+
+      final String timeSeriesSuffix =
+          deviceSuffix + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i);
+      final ChangingValueFilter filter =
+          pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
+
+      if (filter != null) {
+        if (filter.filter(row.getTime(), row.getObject(i))) {
+          hasNonNullMeasurements = true;
+        } else {
+          remarkableRow.markNull(i);
+        }
+      } else {
+        hasNonNullMeasurements = true;
+        pathLastObjectCache.setPartialPathLastObject(
+            timeSeriesSuffix, new ChangingValueFilter<>(this, row.getTime(), 
row.getObject(i)));
+      }
+    }
+
+    if (hasNonNullMeasurements) {
+      try {
+        rowCollector.collectRow(remarkableRow);
+      } catch (IOException e) {
+        exception.set(e);
+      }
+    }
+  }
+
+  double getCompressionDeviation() {
+    return compressionDeviation;
+  }
+
+  long getCompressionMinTimeInterval() {
+    return compressionMinTimeInterval;
+  }
+
+  long getCompressionMaxTimeInterval() {
+    return compressionMaxTimeInterval;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
index cdbae8fef59..8d9e73deb07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
 
+import org.apache.iotdb.pipe.api.type.Binary;
+
 import java.util.Objects;
 
 public class SwingingDoorTrendingFilter<T> {
@@ -93,7 +95,7 @@ public class SwingingDoorTrendingFilter<T> {
     }
 
     // For boolean and string type, we only compare the value
-    if (value instanceof Boolean || value instanceof String) {
+    if (value instanceof Boolean || value instanceof String || value 
instanceof Binary) {
       if (Objects.equals(lastStoredValue, value)) {
         return false;
       }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index 49fa2e2cab8..22bc87b2917 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -66,6 +66,17 @@ public class PipeProcessorConstant {
       "processor.sdt.max-time-interval";
   public static final long PROCESSOR_SDT_MAX_TIME_INTERVAL_DEFAULT_VALUE = 
Long.MAX_VALUE;
 
+  public static final String PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION =
+      "processor.changing-value.compression-deviation";
+  public static final double 
PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE = 0;
+  public static final String PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY =
+      "processor.changing-value.min-time-interval";
+  public static final long 
PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE = 0;
+  public static final String PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY =
+      "processor.changing-value.max-time-interval";
+  public static final long 
PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE =
+      Long.MAX_VALUE;
+
   public static final String PROCESSOR_OUTPUT_SERIES_KEY = 
"processor.output-series";
 
   private PipeProcessorConstant() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 511282ff329..a6ae2e9fd38 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.Aggregat
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
@@ -59,6 +60,8 @@ public enum BuiltinPipePlugin {
   TUMBLING_TIME_SAMPLING_PROCESSOR(
       "tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
   SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", 
SwingingDoorTrendingSamplingProcessor.class),
+  CHANGING_VALUE_SAMPLING_PROCESSOR(
+      "changing-value-sampling-processor", 
ChangingValueSamplingProcessor.class),
   THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", 
ThrowingExceptionProcessor.class),
   AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
   COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
@@ -126,6 +129,7 @@ public enum BuiltinPipePlugin {
                   // Processors
                   
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
                   SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+                  
CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
                   
THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
                   AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
                   
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
new file mode 100644
index 00000000000..a9b060ff6ba
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/downsampling/ChangingValueSamplingProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor.downsampling;
+
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.PlaceHolderProcessor;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * changing-value-sampling-processor. There is a real implementation in the 
server module but cannot
+ * be imported here. The pipe agent in the server module will replace this 
class with the real
+ * implementation when initializing the changing-value-sampling-processor.
+ */
+public class ChangingValueSamplingProcessor extends PlaceHolderProcessor {}

Reply via email to