This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3617-pipeline-datalake-all-dimentions-selected-break-stream
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3617-pipeline-datalake-all-dimentions-selected-break-stream by this 
push:
     new 88ea216929 fix(#3617): Validate on pipeline start that not all 
properties are dimensions
88ea216929 is described below

commit 88ea21692979a24269134c21d26a85951463ebb8
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Jun 3 16:16:11 2025 +0200

    fix(#3617): Validate on pipeline start that not all properties are 
dimensions
---
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 107 ++++++++++++++++-----
 1 file changed, 83 insertions(+), 24 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 5cdeae32a0..c2b361406b 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -47,6 +47,8 @@ import 
org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class DataLakeSink extends StreamPipesDataSink implements 
SupportsRuntimeConfig {
 
   private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
@@ -71,13 +73,13 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .category(DataSinkType.INTERNAL)
         .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.timestampReq(),
-                Labels.withId(TIMESTAMP_MAPPING_KEY),
-                PropertyScope.NONE
-            )
-            .build())
+                            .create()
+                            .requiredPropertyWithUnaryMapping(
+                                EpRequirements.timestampReq(),
+                                Labels.withId(TIMESTAMP_MAPPING_KEY),
+                                PropertyScope.NONE
+                            )
+                            .build())
         .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
         .requiredSingleValueSelection(
             Labels.withId(SCHEMA_UPDATE_KEY),
@@ -95,20 +97,14 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
     var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, 
String.class);
     var dimensions = extractor.selectedMultiValues(DIMENSIONS_KEY, 
String.class);
     var ignoreDuplicates = extractor.slideToggleValue(IGNORE_DUPLICATES_KEY);
-    var eventSchema = new EventSchema(parameters.getInputSchemaInfos()
-        .get(0)
-        .getEventSchema()
-        .getEventProperties()
-        .stream()
-        .peek(ep -> {
-          if (dimensions.contains(ep.getRuntimeName())) {
-            LOG.info("Using {} as dimension", ep.getRuntimeName());
-            ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
-          }
-        })
-        .toList()
+    var eventSchema = this.assignPropertyScopesBasedOnDimensions(
+        parameters.getInputSchemaInfos()
+                  .get(0)
+                  .getEventSchema(),
+        dimensions
     );
 
+    this.ensureMeasurementPropertiesExist(eventSchema);
 
     var measure = new DataLakeMeasure(measureName, timestampField, 
eventSchema);
 
@@ -121,11 +117,12 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
     }
 
     measure = new DataExplorerDispatcher().getDataExplorerManager()
-        .getMeasurementSanitizer(runtimeContext.getStreamPipesClient(), 
measure)
-        .sanitizeAndRegister();
+                                          
.getMeasurementSanitizer(runtimeContext.getStreamPipesClient(), measure)
+                                          .sanitizeAndRegister();
 
     this.timeSeriesStore = new TimeSeriesStore(
-        new 
DataExplorerDispatcher().getDataExplorerManager().getTimeseriesStorage(measure, 
ignoreDuplicates),
+        new DataExplorerDispatcher().getDataExplorerManager()
+                                    .getTimeseriesStorage(measure, 
ignoreDuplicates),
         measure,
         Environments.getEnvironment(),
         true
@@ -144,11 +141,73 @@ public class DataLakeSink extends StreamPipesDataSink 
implements SupportsRuntime
   }
 
   @Override
-  public StaticProperty resolveConfiguration(String staticPropertyInternalName,
-                                             IStaticPropertyExtractor 
extractor) {
+  public StaticProperty resolveConfiguration(
+      String staticPropertyInternalName,
+      IStaticPropertyExtractor extractor
+  ) {
     var staticProperty = extractor.getStaticPropertyByName(DIMENSIONS_KEY, 
RuntimeResolvableAnyStaticProperty.class);
     var inputFields = extractor.getInputEventProperties(0);
     new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty);
     return staticProperty;
   }
+
+  /**
+   * Assigns property scopes to event properties based on the provided list of 
dimension names.
+   *
+   * @param eventSchema The event schema whose properties will be updated.
+   * @param dimensions  The list of property runtime names to be marked as 
dimensions.
+   * @return A new {@link EventSchema} with updated property scopes.
+   */
+  private EventSchema assignPropertyScopesBasedOnDimensions(EventSchema 
eventSchema, List<String> dimensions) {
+
+    var eventProperties = eventSchema
+        .getEventProperties()
+        .stream()
+        .peek(ep -> {
+          // Set all properties to DIMENSION_PROPERTY when seleted in 
dimensions
+          if (dimensions.contains(ep.getRuntimeName())) {
+            LOG.info("Using {} as dimension", ep.getRuntimeName());
+            ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
+          }
+        })
+        .peek(ep -> {
+          // Remova all dimensions from DIMENSION_PROPERTY scope if not part 
of dimensions
+          if (ep.getPropertyScope()
+                .equals(PropertyScope.DIMENSION_PROPERTY.name())) {
+            if (!dimensions.contains(ep.getRuntimeName())) {
+              ep.setPropertyScope(PropertyScope.MEASUREMENT_PROPERTY.name());
+            }
+          }
+        })
+        .toList();
+
+    return new EventSchema(eventProperties);
+  }
+
+  /**
+   * Validates that not all properties in the given {@link EventSchema} are 
marked as dimensions.
+   * If that is the case, influx db is not able to query the data correclty.
+   * This validation is due to the usage of influxdb, if we change the 
dabtabase in the future, we can remove this
+   * validation.
+   *
+   * @param eventSchema The event schema to validate.
+   * @throws SpRuntimeException if all properties are marked as dimensions.
+   */
+  private void ensureMeasurementPropertiesExist(EventSchema eventSchema) 
throws SpRuntimeException {
+    int amountOfPropertiesWithoutTimestamp = eventSchema.getEventProperties()
+                                                        .size() - 1;
+    long amountOfDimensionProperties = eventSchema
+        .getEventProperties()
+        .stream()
+        .filter(ep -> PropertyScope.DIMENSION_PROPERTY.name()
+                                                      
.equals(ep.getPropertyScope()))
+        .count();
+
+    if (amountOfPropertiesWithoutTimestamp > 0 && amountOfDimensionProperties 
== amountOfPropertiesWithoutTimestamp) {
+      throw new SpRuntimeException(
+          "Cannot store data: All properties are marked as dimensions. "
+              + "At least one property must be a measurement value. Please 
edit the pipeline to fix this problem.");
+
+    }
+  }
 }

Reply via email to