This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3617-pipeline-datalake-all-dimentions-selected-break-stream
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3617-pipeline-datalake-all-dimentions-selected-break-stream by this
push:
new 88ea216929 fix(#3617): Validate on pipeline start that not all
properties are dimensions
88ea216929 is described below
commit 88ea21692979a24269134c21d26a85951463ebb8
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Jun 3 16:16:11 2025 +0200
fix(#3617): Validate on pipeline start that not all properties are
dimensions
---
.../sinks/internal/jvm/datalake/DataLakeSink.java | 107 ++++++++++++++++-----
1 file changed, 83 insertions(+), 24 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 5cdeae32a0..c2b361406b 100644
---
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -47,6 +47,8 @@ import
org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class DataLakeSink extends StreamPipesDataSink implements
SupportsRuntimeConfig {
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
@@ -71,13 +73,13 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.category(DataSinkType.INTERNAL)
.requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE
- )
- .build())
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE
+ )
+ .build())
.requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
.requiredSingleValueSelection(
Labels.withId(SCHEMA_UPDATE_KEY),
@@ -95,20 +97,14 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY,
String.class);
var dimensions = extractor.selectedMultiValues(DIMENSIONS_KEY,
String.class);
var ignoreDuplicates = extractor.slideToggleValue(IGNORE_DUPLICATES_KEY);
- var eventSchema = new EventSchema(parameters.getInputSchemaInfos()
- .get(0)
- .getEventSchema()
- .getEventProperties()
- .stream()
- .peek(ep -> {
- if (dimensions.contains(ep.getRuntimeName())) {
- LOG.info("Using {} as dimension", ep.getRuntimeName());
- ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
- }
- })
- .toList()
+ var eventSchema = this.assignPropertyScopesBasedOnDimensions(
+ parameters.getInputSchemaInfos()
+ .get(0)
+ .getEventSchema(),
+ dimensions
);
+ this.ensureMeasurementPropertiesExist(eventSchema);
var measure = new DataLakeMeasure(measureName, timestampField,
eventSchema);
@@ -121,11 +117,12 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
}
measure = new DataExplorerDispatcher().getDataExplorerManager()
- .getMeasurementSanitizer(runtimeContext.getStreamPipesClient(),
measure)
- .sanitizeAndRegister();
+
.getMeasurementSanitizer(runtimeContext.getStreamPipesClient(), measure)
+ .sanitizeAndRegister();
this.timeSeriesStore = new TimeSeriesStore(
- new
DataExplorerDispatcher().getDataExplorerManager().getTimeseriesStorage(measure,
ignoreDuplicates),
+ new DataExplorerDispatcher().getDataExplorerManager()
+ .getTimeseriesStorage(measure,
ignoreDuplicates),
measure,
Environments.getEnvironment(),
true
@@ -144,11 +141,73 @@ public class DataLakeSink extends StreamPipesDataSink
implements SupportsRuntime
}
@Override
- public StaticProperty resolveConfiguration(String staticPropertyInternalName,
- IStaticPropertyExtractor
extractor) {
+ public StaticProperty resolveConfiguration(
+ String staticPropertyInternalName,
+ IStaticPropertyExtractor extractor
+ ) {
var staticProperty = extractor.getStaticPropertyByName(DIMENSIONS_KEY,
RuntimeResolvableAnyStaticProperty.class);
var inputFields = extractor.getInputEventProperties(0);
new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty);
return staticProperty;
}
+
+ /**
+ * Assigns property scopes to event properties based on the provided list of
dimension names.
+ *
+ * @param eventSchema The event schema whose properties will be updated.
+ * @param dimensions The list of property runtime names to be marked as
dimensions.
+ * @return A new {@link EventSchema} with updated property scopes.
+ */
+ private EventSchema assignPropertyScopesBasedOnDimensions(EventSchema
eventSchema, List<String> dimensions) {
+
+ var eventProperties = eventSchema
+ .getEventProperties()
+ .stream()
+ .peek(ep -> {
+ // Set all properties to DIMENSION_PROPERTY when seleted in
dimensions
+ if (dimensions.contains(ep.getRuntimeName())) {
+ LOG.info("Using {} as dimension", ep.getRuntimeName());
+ ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
+ }
+ })
+ .peek(ep -> {
+ // Remova all dimensions from DIMENSION_PROPERTY scope if not part
of dimensions
+ if (ep.getPropertyScope()
+ .equals(PropertyScope.DIMENSION_PROPERTY.name())) {
+ if (!dimensions.contains(ep.getRuntimeName())) {
+ ep.setPropertyScope(PropertyScope.MEASUREMENT_PROPERTY.name());
+ }
+ }
+ })
+ .toList();
+
+ return new EventSchema(eventProperties);
+ }
+
+ /**
+ * Validates that not all properties in the given {@link EventSchema} are
marked as dimensions.
+ * If that is the case, influx db is not able to query the data correclty.
+ * This validation is due to the usage of influxdb, if we change the
dabtabase in the future, we can remove this
+ * validation.
+ *
+ * @param eventSchema The event schema to validate.
+ * @throws SpRuntimeException if all properties are marked as dimensions.
+ */
+ private void ensureMeasurementPropertiesExist(EventSchema eventSchema)
throws SpRuntimeException {
+ int amountOfPropertiesWithoutTimestamp = eventSchema.getEventProperties()
+ .size() - 1;
+ long amountOfDimensionProperties = eventSchema
+ .getEventProperties()
+ .stream()
+ .filter(ep -> PropertyScope.DIMENSION_PROPERTY.name()
+
.equals(ep.getPropertyScope()))
+ .count();
+
+ if (amountOfPropertiesWithoutTimestamp > 0 && amountOfDimensionProperties
== amountOfPropertiesWithoutTimestamp) {
+ throw new SpRuntimeException(
+ "Cannot store data: All properties are marked as dimensions. "
+ + "At least one property must be a measurement value. Please
edit the pipeline to fix this problem.");
+
+ }
+ }
}