This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch add-manual-data-lake-dimensions-selection
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 790dc86103402ea559be320e1fe13474165d439e
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Aug 12 15:54:38 2024 +0200

    feat: Support manual assignments of dimensions in data lake sink
---
 .../api/extractor/IParameterExtractor.java         |  2 +
 .../jvm/InternalSinksExtensionModuleExports.java   |  5 +-
 .../jvm/datalake/DataLakeDimensionProvider.java    | 70 ++++++++++++++++
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 76 +++++++++++------
 .../migrations/DataLakeSinkMigrationV2.java        | 58 +++++++++++++
 .../documentation.md                               | 10 +++
 .../strings.en                                     |  3 +
 .../migrations/DataLakeSinkMigrationV2Test.java    | 95 ++++++++++++++++++++++
 .../sdk/extractor/AbstractParameterExtractor.java  |  6 ++
 9 files changed, 301 insertions(+), 24 deletions(-)

diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java
 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java
index dbcdb4eba4..8efad70197 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java
+++ 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java
@@ -95,4 +95,6 @@ public interface IParameterExtractor {
   List<String> getEventPropertiesSelectorByScope(PropertyScope scope);
 
   List<EventProperty> getEventPropertiesByScope(PropertyScope scope);
+
+  List<EventProperty> getInputEventProperties(int streamIndex);
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java
index 8dba55f9aa..72bcbcb7d1 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java
@@ -24,6 +24,7 @@ import 
org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink;
 import 
org.apache.streampipes.sinks.internal.jvm.datalake.migrations.DataLakeSinkMigrationV1;
+import 
org.apache.streampipes.sinks.internal.jvm.datalake.migrations.DataLakeSinkMigrationV2;
 import 
org.apache.streampipes.sinks.internal.jvm.notification.InternalStreamPipesNotificationSink;
 
 import java.util.Collections;
@@ -45,6 +46,8 @@ public class InternalSinksExtensionModuleExports implements 
IExtensionModuleExpo
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return List.of(new DataLakeSinkMigrationV1());
+    return List.of(
+        new DataLakeSinkMigrationV1(),
+        new DataLakeSinkMigrationV2());
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java
new file mode 100644
index 0000000000..05c02730ee
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.model.staticproperty.Option;
+import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
+
+import java.net.URI;
+import java.util.List;
+
+public class DataLakeDimensionProvider {
+
+  public void applyOptions(List<EventProperty> inputFields,
+                           RuntimeResolvableAnyStaticProperty staticProperty) {
+    var primitiveFields = getPrimitiveFields(inputFields);
+    primitiveFields
+        .forEach(field -> addFieldIfNotExists(field, 
staticProperty.getOptions()));
+    staticProperty.getOptions().removeIf(o -> !existsInFields(o, 
primitiveFields));
+  }
+
+  private List<EventPropertyPrimitive> getPrimitiveFields(List<EventProperty> 
inputFields) {
+    return inputFields
+        .stream()
+        .filter(field -> field instanceof EventPropertyPrimitive)
+        .filter(field -> satisfiesFilter((EventPropertyPrimitive) field))
+        .map(field -> (EventPropertyPrimitive) field)
+        .toList();
+  }
+
+  private boolean satisfiesFilter(EventPropertyPrimitive field) {
+    return !field.getRuntimeType().equals(Datatypes.Float.toString())
+        &&  
!(field.getDomainProperties().stream().map(URI::toString).toList().contains(SO.DATE_TIME));
+  }
+
+  private void addFieldIfNotExists(EventPropertyPrimitive field,
+                                   List<Option> options) {
+    if (options.stream().noneMatch(o -> 
o.getName().equals(field.getRuntimeName()))) {
+      options.add(new Option(
+          field.getRuntimeName(),
+          PropertyScope.valueOf(field.getPropertyScope()) == 
PropertyScope.DIMENSION_PROPERTY)
+      );
+    }
+  }
+
+  private boolean existsInFields(Option o,
+                                 List<EventPropertyPrimitive> primitiveFields) 
{
+    return primitiveFields.stream().anyMatch(field -> 
field.getRuntimeName().equals(o.getName()));
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index ef88bbdcdc..5417a17dd7 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -22,14 +22,19 @@ import 
org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataexplorer.TimeSeriesStore;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import 
org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
+import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -39,15 +44,20 @@ import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-public class DataLakeSink extends StreamPipesDataSink {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataLakeSink extends StreamPipesDataSink implements 
SupportsRuntimeConfig {
 
   private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
   private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
   public static final String SCHEMA_UPDATE_KEY = "schema_update";
+  public static final String DIMENSIONS_KEY = "dimensions_selection";
 
   public static final String SCHEMA_UPDATE_OPTION = "Update schema";
 
   public static final String EXTEND_EXISTING_SCHEMA_OPTION = "Extend existing 
schema";
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeSink.class);
 
   private TimeSeriesStore timeSeriesStore;
 
@@ -55,25 +65,25 @@ public class DataLakeSink extends StreamPipesDataSink {
   @Override
   public DataSinkDescription declareModel() {
     return DataSinkBuilder
-      .create("org.apache.streampipes.sinks.internal.jvm.datalake", 1)
-      .withLocales(Locales.EN)
-      .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-      .category(DataSinkType.INTERNAL)
-      .requiredStream(StreamRequirementsBuilder
-                        .create()
-                        .requiredPropertyWithUnaryMapping(
-                          EpRequirements.timestampReq(),
-                          Labels.withId(TIMESTAMP_MAPPING_KEY),
-                          PropertyScope.NONE
-                        )
-                        .build())
-      .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
-      .requiredSingleValueSelection(
-        Labels.withId(SCHEMA_UPDATE_KEY),
-        Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
-      )
-
-      .build();
+        .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2)
+        .withLocales(Locales.EN)
+        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+        .category(DataSinkType.INTERNAL)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.timestampReq(),
+                Labels.withId(TIMESTAMP_MAPPING_KEY),
+                PropertyScope.NONE
+            )
+            .build())
+        .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+        .requiredSingleValueSelection(
+            Labels.withId(SCHEMA_UPDATE_KEY),
+            Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
+        )
+        
.requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY))
+        .build();
   }
 
   @Override
@@ -81,9 +91,20 @@ public class DataLakeSink extends StreamPipesDataSink {
     var extractor = parameters.extractor();
     var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
     var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, 
String.class);
-    var eventSchema = parameters.getInputSchemaInfos()
-                                .get(0)
-                                .getEventSchema();
+    var dimensions = extractor.selectedMultiValues(DIMENSIONS_KEY, 
String.class);
+    var eventSchema = new EventSchema(parameters.getInputSchemaInfos()
+        .get(0)
+        .getEventSchema()
+        .getEventProperties()
+        .stream()
+        .peek(ep -> {
+          if (dimensions.contains(ep.getRuntimeName())) {
+            LOG.info("Using {} as dimension", ep.getRuntimeName());
+            ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
+          }
+        })
+        .toList()
+    );
 
 
     var measure = new DataLakeMeasure(measureName, timestampField, 
eventSchema);
@@ -118,4 +139,13 @@ public class DataLakeSink extends StreamPipesDataSink {
   public void onDetach() throws SpRuntimeException {
     this.timeSeriesStore.close();
   }
+
+  @Override
+  public StaticProperty resolveConfiguration(String staticPropertyInternalName,
+                                             IStaticPropertyExtractor 
extractor) {
+    var staticProperty = extractor.getStaticPropertyByName(DIMENSIONS_KEY, 
RuntimeResolvableAnyStaticProperty.class);
+    var inputFields = extractor.getInputEventProperties(0);
+    new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty);
+    return staticProperty;
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java
new file mode 100644
index 0000000000..d47bf06037
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake.migrations;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty;
+import org.apache.streampipes.sdk.helpers.Labels;
+import 
org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeDimensionProvider;
+import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink;
+
+public class DataLakeSinkMigrationV2 implements IDataSinkMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        "org.apache.streampipes.sinks.internal.jvm.datalake",
+        SpServiceTagPrefix.DATA_SINK,
+        1,
+        2
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+    var label = Labels.from(DataLakeSink.DIMENSIONS_KEY, "Dimensions", 
"Selected fields will be stored as dimensions.");
+    var staticProperty = new RuntimeResolvableAnyStaticProperty(
+        label.getInternalId(),
+        label.getLabel(),
+        label.getDescription()
+    );
+    var inputFields = 
element.getInputStreams().get(0).getEventSchema().getEventProperties();
+    new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty);
+
+    element.getStaticProperties().add(staticProperty);
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md
index 2d58b7e3d9..d87bc9a6e0 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md
@@ -41,6 +41,16 @@ This sink requires an event that provides a timestamp value 
(a field that is mar
 
 ## Configuration
 
+### Dimensions
+
+The fields which will be stored as dimensional values in the time series 
storage. Dimensions are typically identifiers 
+such as the ID of a sensor.
+Dimensions support grouping in the data explorer, but will be converted to a 
text-based field and provide less advanced 
+filtering capabilities.
+
+Be careful when modifying dimensions of existing pipelines! This might have 
impact on how you are able to view data in 
+the data explorer due to schema incompatibilities.
+
 ### Identifier
 
 The name of the measurement (table) where the events are stored.
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
index 0b604bdfd9..b6ea6ef6aa 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
@@ -26,3 +26,6 @@ timestamp_mapping.description=The value which contains a 
timestamp
 
 schema_update.title=Schema Update
 schema_update.description=Update existing schemas with the new one or extend 
the existing schema with new properties
+
+dimensions_selection.key=Dimensions
+dimensions_selection.description=Selected fields will be stored as dimensions.
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java
new file mode 100644
index 0000000000..69d696da07
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake.migrations;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.schema.PropertyScope;
+import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class DataLakeSinkMigrationV2Test {
+
+  @Test
+  public void migrate() {
+    var dataLakeSinkMigrationV2 = new DataLakeSinkMigrationV2();
+
+    var stream = new SpDataStream();
+    stream.setEventSchema(makeSchema());
+    var extractor = mock(DataSinkParameterExtractor.class);
+    var invocation = new DataSinkInvocation();
+    invocation.setStaticProperties(new ArrayList<>());
+    invocation.setInputStreams(List.of(stream));
+
+    var actual = dataLakeSinkMigrationV2.migrate(invocation, extractor);
+
+    Assertions.assertTrue(actual.success());
+    Assertions.assertEquals(actual.element()
+        .getStaticProperties()
+        .size(), 1);
+    var dimensionConfig = getAnyStaticProperty(actual);
+    Assertions.assertEquals(dimensionConfig.getInternalName(), 
DataLakeSink.DIMENSIONS_KEY);
+    Assertions.assertEquals(dimensionConfig.getOptions().size(), 3);
+    Assertions.assertTrue(dimensionConfig.getOptions().get(0).isSelected());
+    Assertions.assertFalse(dimensionConfig.getOptions().get(1).isSelected());
+    Assertions.assertFalse(dimensionConfig.getOptions().get(2).isSelected());
+  }
+
+  private static RuntimeResolvableAnyStaticProperty 
getAnyStaticProperty(MigrationResult<DataSinkInvocation> actual) {
+    return (RuntimeResolvableAnyStaticProperty) actual.element()
+        .getStaticProperties()
+        .get(0);
+  }
+
+  private static EventSchema makeSchema() {
+    return new EventSchema(
+        List.of(
+            PrimitivePropertyBuilder
+                .create(Datatypes.String, "a")
+                .scope(PropertyScope.DIMENSION_PROPERTY)
+                .build(),
+            PrimitivePropertyBuilder
+                .create(Datatypes.Float, "b")
+                .scope(PropertyScope.MEASUREMENT_PROPERTY)
+                .build(),
+            PrimitivePropertyBuilder
+                .create(Datatypes.Integer, "c")
+                .scope(PropertyScope.MEASUREMENT_PROPERTY)
+                .build(),
+            PrimitivePropertyBuilder
+                .create(Datatypes.Boolean, "d")
+                .scope(PropertyScope.MEASUREMENT_PROPERTY)
+                .build()
+        )
+    );
+  }
+}
diff --git 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
index 2bec1f786f..7160c68e6b 100644
--- 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
+++ 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
@@ -570,4 +570,10 @@ public abstract class AbstractParameterExtractor<T extends 
InvocableStreamPipesE
             ep.getPropertyScope() != null && 
ep.getPropertyScope().equals(scope.name()))
         .collect(Collectors.toList());
   }
+
+  @Override
+  public List<EventProperty> getInputEventProperties(int streamIndex) {
+    return !sepaElement.getInputStreams().isEmpty()
+        ? 
sepaElement.getInputStreams().get(0).getEventSchema().getEventProperties() : 
List.of();
+  }
 }

Reply via email to