This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch add-duplicate-filter-to-timeseries-storage in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit bdeb05be4d54c6459d50aa649c5c6bda26f3f918 Author: Dominik Riemer <[email protected]> AuthorDate: Thu Sep 26 22:38:06 2024 +0200 feat(#3259): Add duplicate filter to timeseries storage --- .../dataexplorer/api/IDataExplorerManager.java | 10 ++-- .../influx/DataExplorerManagerInflux.java | 23 ++++---- .../influx/PropertyDuplicateFilter.java | 45 ++++++++++++++++ .../dataexplorer/influx/PropertyHandler.java | 51 ++++++++++-------- .../influx/TimeSeriesStorageInflux.java | 10 +++- .../influx/PropertyDuplicateFilterTest.java | 63 ++++++++++++++++++++++ .../iotdb/DataExplorerManagerIotDb.java | 3 +- .../sinks/internal/jvm/datalake/DataLakeSink.java | 5 +- .../migrations/DataLakeSinkMigrationV2.java | 29 +++++++++- .../strings.en | 3 ++ .../static-slide-toggle.component.html | 1 - .../static-slide-toggle.component.ts | 1 + 12 files changed, 203 insertions(+), 41 deletions(-) diff --git a/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerManager.java b/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerManager.java index ef1b68a0ca..586a77706c 100644 --- a/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerManager.java +++ b/streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerManager.java @@ -34,15 +34,19 @@ public interface IDataExplorerManager { * @return An instance of {@link IDataLakeMeasurementCounter} configured to count the sizes of the specified measurements. */ IDataLakeMeasurementCounter getMeasurementCounter( - List<DataLakeMeasure> allMeasurements, - List<String> measurementsToCount + List<DataLakeMeasure> allMeasurements, + List<String> measurementsToCount ); IDataExplorerQueryManagement getQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement); IDataExplorerSchemaManagement getSchemaManagement(); - ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure); + default ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure) { + return getTimeseriesStorage(measure, false); + } + + ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure, boolean ignoreDuplicates); IDataLakeMeasurementSanitizer getMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure); } diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerManagerInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerManagerInflux.java index 1c25d5fbc5..869f314b1d 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerManagerInflux.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerManagerInflux.java @@ -21,9 +21,9 @@ package org.apache.streampipes.dataexplorer.influx; import org.apache.streampipes.client.api.IStreamPipesClient; import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement; +import org.apache.streampipes.dataexplorer.api.IDataExplorerManager; import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement; import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; -import org.apache.streampipes.dataexplorer.api.IDataExplorerManager; import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementCounter; import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementSanitizer; import org.apache.streampipes.dataexplorer.api.ITimeSeriesStorage; @@ -38,28 +38,33 @@ public class DataExplorerManagerInflux implements IDataExplorerManager { @Override public IDataLakeMeasurementCounter getMeasurementCounter( - List<DataLakeMeasure> allMeasurements, - List<String> measurementsToCount) { + List<DataLakeMeasure> allMeasurements, + List<String> measurementsToCount) { return new DataLakeMeasurementCounterInflux(allMeasurements, measurementsToCount); } @Override public IDataExplorerQueryManagement getQueryManagement( - IDataExplorerSchemaManagement dataExplorerSchemaManagement - ) { + IDataExplorerSchemaManagement dataExplorerSchemaManagement + ) { return new DataExplorerQueryManagementInflux(dataExplorerSchemaManagement); } @Override public IDataExplorerSchemaManagement getSchemaManagement() { return new DataExplorerSchemaManagement(StorageDispatcher.INSTANCE - .getNoSqlStore() - .getDataLakeStorage()); + .getNoSqlStore() + .getDataLakeStorage()); } @Override - public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure) { - return new TimeSeriesStorageInflux(measure, Environments.getEnvironment(), new InfluxClientProvider()); + public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure, boolean ignoreDuplicates) { + return new TimeSeriesStorageInflux( + measure, + ignoreDuplicates, + Environments.getEnvironment(), + new InfluxClientProvider() + ); } @Override diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilter.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilter.java new file mode 100644 index 0000000000..fe70b15eae --- /dev/null +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.influx; + +import org.apache.streampipes.model.runtime.field.PrimitiveField; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class PropertyDuplicateFilter { + + private final boolean ignoreDuplicates; + private final Map<String, Object> lastValues; + + public PropertyDuplicateFilter(boolean ignoreDuplicates) { + this.ignoreDuplicates = ignoreDuplicates; + this.lastValues = new HashMap<>(); + } + + public boolean shouldIgnoreField(String sanitizedRuntimeName, + PrimitiveField eventPropertyPrimitiveField) { + var rawValue = eventPropertyPrimitiveField.getRawValue(); + boolean shouldIgnore = ignoreDuplicates && lastValues.containsKey(sanitizedRuntimeName) + && Objects.equals(lastValues.get(sanitizedRuntimeName), rawValue); + lastValues.put(sanitizedRuntimeName, rawValue); + return shouldIgnore; + } +} diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyHandler.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyHandler.java index 74d03ba3e2..a19c7d9ed8 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyHandler.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/PropertyHandler.java @@ -34,10 +34,13 @@ public class PropertyHandler { private static final Logger LOG = LoggerFactory.getLogger(PropertyHandler.class); - private RawFieldSerializer rawFieldSerializer; + private final RawFieldSerializer rawFieldSerializer; + private final PropertyDuplicateFilter duplicateFilter; - public PropertyHandler() { + + public PropertyHandler(PropertyDuplicateFilter duplicateFilter) { rawFieldSerializer = new RawFieldSerializer(); + this.duplicateFilter = duplicateFilter; } /** @@ -109,30 +112,32 @@ public class PropertyHandler { String sanitizedRuntimeName, PrimitiveField eventPropertyPrimitiveField ) { - // Store property according to property type - if (XSD.INTEGER.toString() - .equals(runtimeType)) { - handleIntegerProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else if (XSD.LONG.toString() - .equals(runtimeType)) { - handleLongProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else if (XSD.FLOAT.toString() - .equals(runtimeType)) { - handleFloatProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else if (XSD.DOUBLE.toString() - .equals(runtimeType)) { - handleDoubleProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else if (XSD.BOOLEAN.toString() - .equals(runtimeType)) { - handleBooleanProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else if (SO.NUMBER.equals(runtimeType)) { - handleDoubleProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); - } else { - handleStringProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + + if (!duplicateFilter.shouldIgnoreField(sanitizedRuntimeName, eventPropertyPrimitiveField)) { + // Store property according to property type + if (XSD.INTEGER.toString() + .equals(runtimeType)) { + handleIntegerProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else if (XSD.LONG.toString() + .equals(runtimeType)) { + handleLongProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else if (XSD.FLOAT.toString() + .equals(runtimeType)) { + handleFloatProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else if (XSD.DOUBLE.toString() + .equals(runtimeType)) { + handleDoubleProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else if (XSD.BOOLEAN.toString() + .equals(runtimeType)) { + handleBooleanProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else if (SO.NUMBER.equals(runtimeType)) { + handleDoubleProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } else { + handleStringProperty(p, sanitizedRuntimeName, eventPropertyPrimitiveField); + } } } - private void handleStringProperty( Point.Builder p, String sanitizedRuntimeName, diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java index fa35cbe9b9..799808541e 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java @@ -38,15 +38,23 @@ public class TimeSeriesStorageInflux extends TimeSeriesStorage { private final PropertyHandler propertyHandler; + public TimeSeriesStorageInflux( + DataLakeMeasure measure, + Environment environment, + InfluxClientProvider influxClientProvider + ) throws SpRuntimeException { + this(measure, false, environment, influxClientProvider); + } public TimeSeriesStorageInflux( DataLakeMeasure measure, + boolean ignoreDuplicates, Environment environment, InfluxClientProvider influxClientProvider ) throws SpRuntimeException { super(measure); this.influxDb = influxClientProvider.getSetUpInfluxDBClient(environment); - propertyHandler = new PropertyHandler(); + propertyHandler = new PropertyHandler(new PropertyDuplicateFilter(ignoreDuplicates)); } protected void writeToTimeSeriesStorage(Event event) throws SpRuntimeException { diff --git a/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilterTest.java b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilterTest.java new file mode 100644 index 0000000000..0b9acb8a62 --- /dev/null +++ b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/PropertyDuplicateFilterTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.influx; + +import org.apache.streampipes.model.runtime.field.PrimitiveField; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PropertyDuplicateFilterTest { + + @Test + public void testDuplicate() { + var filter = new PropertyDuplicateFilter(true); + var field = new PrimitiveField(null, null, 1); + var result = filter.shouldIgnoreField("test", field); + + assertFalse(result); + + field = new PrimitiveField(null, null, 2); + result = filter.shouldIgnoreField("test", field); + assertFalse(result); + + field = new PrimitiveField(null, null, 2); + result = filter.shouldIgnoreField("test1", field); + assertFalse(result); + + field = new PrimitiveField(null, null, 2); + result = filter.shouldIgnoreField("test", field); + assertTrue(result); + } + + @Test + public void testDuplicateDeactivated() { + var filter = new PropertyDuplicateFilter(false); + var field = new PrimitiveField(null, null, 1); + var result = filter.shouldIgnoreField("test", field); + + assertFalse(result); + + field = new PrimitiveField(null, null, 1); + result = filter.shouldIgnoreField("test", field); + assertFalse(result); + } +} diff --git a/streampipes-data-explorer-iotdb/src/main/java/org/apache/streampipes/dataexplorer/iotdb/DataExplorerManagerIotDb.java b/streampipes-data-explorer-iotdb/src/main/java/org/apache/streampipes/dataexplorer/iotdb/DataExplorerManagerIotDb.java index 97ef58e99e..31379d06df 100644 --- a/streampipes-data-explorer-iotdb/src/main/java/org/apache/streampipes/dataexplorer/iotdb/DataExplorerManagerIotDb.java +++ b/streampipes-data-explorer-iotdb/src/main/java/org/apache/streampipes/dataexplorer/iotdb/DataExplorerManagerIotDb.java @@ -56,7 +56,8 @@ public class DataExplorerManagerIotDb implements IDataExplorerManager { } @Override - public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure) { + public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure, boolean ignoreDuplicates) { + // TODO: ignoreDuplicates not yet supported for IotDB return new TimeSeriesStorageIotDb(measure, new IotDbPropertyConverter(), new IotDbSessionProvider()); } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java index 5417a17dd7..5cdeae32a0 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java @@ -53,6 +53,7 @@ public class DataLakeSink extends StreamPipesDataSink implements SupportsRuntime private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping"; public static final String SCHEMA_UPDATE_KEY = "schema_update"; public static final String DIMENSIONS_KEY = "dimensions_selection"; + public static final String IGNORE_DUPLICATES_KEY = "ignore_duplicates"; public static final String SCHEMA_UPDATE_OPTION = "Update schema"; @@ -83,6 +84,7 @@ public class DataLakeSink extends StreamPipesDataSink implements SupportsRuntime Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION) ) .requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY)) + .requiredSlideToggle(Labels.withId(IGNORE_DUPLICATES_KEY), false) .build(); } @@ -92,6 +94,7 @@ public class DataLakeSink extends StreamPipesDataSink implements SupportsRuntime var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY); var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class); var dimensions = extractor.selectedMultiValues(DIMENSIONS_KEY, String.class); + var ignoreDuplicates = extractor.slideToggleValue(IGNORE_DUPLICATES_KEY); var eventSchema = new EventSchema(parameters.getInputSchemaInfos() .get(0) .getEventSchema() @@ -122,7 +125,7 @@ public class DataLakeSink extends StreamPipesDataSink implements SupportsRuntime .sanitizeAndRegister(); this.timeSeriesStore = new TimeSeriesStore( - new DataExplorerDispatcher().getDataExplorerManager().getTimeseriesStorage(measure), + new DataExplorerDispatcher().getDataExplorerManager().getTimeseriesStorage(measure, ignoreDuplicates), measure, Environments.getEnvironment(), true diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java index d47bf06037..5a81ba4cd7 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java @@ -25,6 +25,7 @@ import org.apache.streampipes.model.graph.DataSinkInvocation; import org.apache.streampipes.model.migration.MigrationResult; import org.apache.streampipes.model.migration.ModelMigratorConfig; import org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty; +import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeDimensionProvider; import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; @@ -43,7 +44,17 @@ public class DataLakeSinkMigrationV2 implements IDataSinkMigrator { @Override public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element, IDataSinkParameterExtractor extractor) throws RuntimeException { - var label = Labels.from(DataLakeSink.DIMENSIONS_KEY, "Dimensions", "Selected fields will be stored as dimensions."); + addDimensionSelection(element); + addDuplicateToggle(element); + return MigrationResult.success(element); + } + + private void addDimensionSelection(DataSinkInvocation element) { + var label = Labels.from( + DataLakeSink.DIMENSIONS_KEY, + "Dimensions", + "Selected fields will be stored as dimensions." + ); var staticProperty = new RuntimeResolvableAnyStaticProperty( label.getInternalId(), label.getLabel(), @@ -53,6 +64,20 @@ public class DataLakeSinkMigrationV2 implements IDataSinkMigrator { new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty); element.getStaticProperties().add(staticProperty); - return MigrationResult.success(element); + } + + private void addDuplicateToggle(DataSinkInvocation element) { + var label = Labels.from( + DataLakeSink.IGNORE_DUPLICATES_KEY, + "Ignore duplicates", + "Fields having the same value than the previous event are not stored." + ); + var staticProperty = new SlideToggleStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription(), + false); + + element.getStaticProperties().add(staticProperty); } } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en index 4c23b83d27..d1fcc39c7e 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en @@ -29,3 +29,6 @@ schema_update.description=Update existing schemas with the new one or extend the dimensions_selection.title=Dimensions dimensions_selection.description=Selected fields will be stored as dimensions. + +ignore_duplicates.title=Ignore duplicates +ignore_duplicates.description=Fields having the same value than the previous event are not stored. This only affects measurements, not tags. diff --git a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.html b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.html index 0863899fa6..e3a4e5c69c 100644 --- a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.html +++ b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.html @@ -22,7 +22,6 @@ fxFlex formControlName="{{ fieldName }}" color="accent" - required [attr.data-cy]="fieldName" > {{ staticProperty.description }} diff --git a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts index 64e17a6005..e74274ae7f 100644 --- a/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts +++ b/ui/src/app/core-ui/static-properties/static-slide-toggle/static-slide-toggle.component.ts @@ -33,6 +33,7 @@ export class StaticSlideToggleComponent ngOnInit(): void { this.addValidator(this.staticProperty.selected, Validators.required); this.enableValidators(); + this.emitUpdate(); } emitUpdate() {
