This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7105d35b87 fix(#3325): compact adapters do not add a default property 
scope (#3328)
7105d35b87 is described below

commit 7105d35b8771f11965443c7d7fe5180ca76eaff0
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Nov 12 17:59:55 2024 +0100

    fix(#3325): compact adapters do not add a default property scope (#3328)
    
    * fix(#3325): Add better logging if count of events in measurement fails
    
    * fix(#3325): Add a test to validate file stream adapter via compact API
---
 .../influx/DataLakeMeasurementCounterInflux.java   | 29 +++++++++++----
 .../query/DataLakeMeasurementCounter.java          | 43 ++++++++++++++++------
 .../fixtures/connect/compact/compactTest.csv       |  2 +
 ui/cypress/fixtures/connect/compact/fileReplay.yml | 40 ++++++++++++++++++++
 .../connect/compact/addCompactAdapter.spec.ts      | 16 ++++++++
 5 files changed, 111 insertions(+), 19 deletions(-)

diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
index db11222324..0a58b8b29a 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
@@ -22,16 +22,23 @@ import 
org.apache.streampipes.dataexplorer.query.DataLakeMeasurementCounter;
 import org.apache.streampipes.model.datalake.AggregationFunction;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class DataLakeMeasurementCounterInflux extends 
DataLakeMeasurementCounter {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);
+
   private static final String COUNT_FIELD = "count";
 
-  public DataLakeMeasurementCounterInflux(List<DataLakeMeasure> 
allMeasurements,
-                                          List<String> measurementNames) {
+  public DataLakeMeasurementCounterInflux(
+      List<DataLakeMeasure> allMeasurements,
+      List<String> measurementNames
+  ) {
     super(allMeasurements, measurementNames);
   }
 
@@ -39,15 +46,21 @@ public class DataLakeMeasurementCounterInflux extends 
DataLakeMeasurementCounter
   protected CompletableFuture<Integer> 
createQueryAsAsyncFuture(DataLakeMeasure measure) {
     return CompletableFuture.supplyAsync(() -> {
       var firstColumn = getFirstMeasurementProperty(measure);
+      if (firstColumn == null) {
+        LOG.error(
+            "Could not count events in measurement: {}, because no measurement 
property was found in event schema",
+            measure.getMeasureName()
+        );
+        return 0;
+      }
+
       var builder = DataLakeInfluxQueryBuilder
-          
.create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
+          .create(measure.getMeasureName())
+          .withEndTime(System.currentTimeMillis())
           .withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
       var queryResult = new 
DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), 
Optional.empty(), true);
-      if (queryResult.getTotal() > 0) {
-        return extractResult(queryResult, COUNT_FIELD);
-      } else {
-        return 0;
-      }
+
+      return queryResult.getTotal() > 0 ? extractResult(queryResult, 
COUNT_FIELD) : 0;
     });
   }
 }
diff --git 
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
 
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
index 844ce5b573..bc3a551ed7 100644
--- 
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
+++ 
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
@@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements 
IDataLakeMeasurement
   protected final List<DataLakeMeasure> allMeasurements;
   protected final List<String> measurementNames;
 
-  public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
-                                         List<String> measurementNames) {
+  public DataLakeMeasurementCounter(
+      List<DataLakeMeasure> allMeasurements,
+      List<String> measurementNames
+  ) {
     this.allMeasurements = allMeasurements;
     this.measurementNames = measurementNames;
   }
@@ -52,12 +54,14 @@ public abstract class DataLakeMeasurementCounter implements 
IDataLakeMeasurement
   public Map<String, Integer> countMeasurementSizes() {
 
     // create async futures so that count queries can be executed parallel
-    Map<String, CompletableFuture<Integer>> countQueriesFutures = 
measurementNames.stream()
+    Map<String, CompletableFuture<Integer>> countQueriesFutures = 
measurementNames
+        .stream()
         .map(this::getMeasure)
         .filter(Objects::nonNull)
         .collect(Collectors.toMap(
-            DataLakeMeasure::getMeasureName,
-            this::createQueryAsAsyncFuture)
+                     DataLakeMeasure::getMeasureName,
+                     this::createQueryAsAsyncFuture
+                 )
         );
 
     return getQueryResults(countQueriesFutures);
@@ -72,7 +76,8 @@ public abstract class DataLakeMeasurementCounter implements 
IDataLakeMeasurement
   private DataLakeMeasure getMeasure(String measureName) {
     return allMeasurements
         .stream()
-        .filter(m -> m.getMeasureName().equals(measureName))
+        .filter(m -> m.getMeasureName()
+                      .equals(measureName))
         .findFirst()
         .orElse(null);
   }
@@ -83,7 +88,7 @@ public abstract class DataLakeMeasurementCounter implements 
IDataLakeMeasurement
    * @param queryFutures A Map containing the futures of
    *                     asynchronous count queries mapped by their respective 
keys.
    * @return A Map representing the results of the queries, where each key 
corresponds to
-   *         a measure name and the value is the count result.
+   * a measure name and the value is the count result.
    */
   private Map<String, Integer> getQueryResults(Map<String, 
CompletableFuture<Integer>> queryFutures) {
     Map<String, Integer> resultPerMeasure = new HashMap<>();
@@ -106,18 +111,34 @@ public abstract class DataLakeMeasurementCounter 
implements IDataLakeMeasurement
    * @return The runtime name of the first measurement property, or null if no 
such property is found.
    */
   protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
-    return measure.getEventSchema().getEventProperties()
+    var propertyRuntimeName = measure
+        .getEventSchema()
+        .getEventProperties()
         .stream()
         .filter(ep -> ep.getPropertyScope() != null
-            && 
ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
+            && ep.getPropertyScope()
+                 .equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
         .map(EventProperty::getRuntimeName)
         .findFirst()
         .orElse(null);
+
+    if (propertyRuntimeName == null) {
+      LOG.error("No measurement property was found in the event schema found 
for measure {}", measure.getMeasureName());
+    }
+
+    return propertyRuntimeName;
   }
 
   protected Integer extractResult(SpQueryResult queryResult, String fieldName) 
{
-    return ((Double) (
-        
queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
+    return (
+        (Double) (
+            queryResult.getAllDataSeries()
+                       .get(0)
+                       .getRows()
+                       .get(0)
+                       .get(queryResult.getHeaders()
+                                       .indexOf(fieldName))
+        )
     ).intValue();
   }
 
diff --git a/ui/cypress/fixtures/connect/compact/compactTest.csv 
b/ui/cypress/fixtures/connect/compact/compactTest.csv
new file mode 100644
index 0000000000..6cf117f7c1
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/compactTest.csv
@@ -0,0 +1,2 @@
+timestamp;value;temperature
+1000;1.0;0.1
diff --git a/ui/cypress/fixtures/connect/compact/fileReplay.yml 
b/ui/cypress/fixtures/connect/compact/fileReplay.yml
new file mode 100644
index 0000000000..3c57795f10
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/fileReplay.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+id: sp:adapterdescription:IfmzfQ
+name: File Stream Adapter Test
+description: ''
+appId: org.apache.streampipes.connect.iiot.protocol.stream.file
+configuration:
+    - filePath: compactTest.csv
+    - replaceTimestamp:
+          - ''
+    - replayOnce: 'no'
+    - speed: keepOriginalTime
+    - delimiter: ;
+      format: CSV
+      header:
+          - Header
+schema:
+    timestamp:
+        description: ''
+        propertyScope: HEADER_PROPERTY
+        semanticType: http://schema.org/DateTime
+transform:
+    rename: {}
+    measurementUnit: {}
+createOptions:
+    persist: true
+    start: true
diff --git a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts 
b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
index c19e345f8d..57d2466c15 100644
--- a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
+++ b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
@@ -19,6 +19,7 @@
 import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
 import { CompactAdapterUtils } from 
'../../../support/utils/connect/CompactAdapterUtils';
 import { PipelineUtils } from '../../../support/utils/pipeline/PipelineUtils';
+import { FileManagementUtils } from 
'../../../support/utils/FileManagementUtils';
 
 describe('Add Compact Adapters', () => {
     beforeEach('Setup Test', () => {
@@ -96,4 +97,19 @@ describe('Add Compact Adapters', () => {
             },
         );
     });
+
+    it('Add file stream adapter via the compact API. Start Adapter with 
Pipeline', () => {
+        FileManagementUtils.addFile('connect/compact/compactTest.csv');
+
+        cy.readFile('cypress/fixtures/connect/compact/fileReplay.yml').then(
+            ymlDescription => {
+                
CompactAdapterUtils.storeCompactYmlAdapter(ymlDescription).then(
+                    () => {
+                        ConnectUtils.validateAdapterIsRunning();
+                        PipelineUtils.checkAmountOfPipelinesPipeline(1);
+                    },
+                );
+            },
+        );
+    });
 });

Reply via email to