This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3325-compact-adapters-do-not-add-a-default-property-scope
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3325-compact-adapters-do-not-add-a-default-property-scope by this
push:
new 5b9449b393 fix(#3325): Add better logging if count of events in
measurement fails
5b9449b393 is described below
commit 5b9449b39346eabdddf90e541018fff2439ec482
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Nov 11 11:05:38 2024 +0100
fix(#3325): Add better logging if count of events in measurement fails
---
.../influx/DataLakeMeasurementCounterInflux.java | 29 +++++++++++----
.../query/DataLakeMeasurementCounter.java | 43 ++++++++++++++++------
2 files changed, 53 insertions(+), 19 deletions(-)
diff --git
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
index db11222324..0a58b8b29a 100644
---
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
+++
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
@@ -22,16 +22,23 @@ import
org.apache.streampipes.dataexplorer.query.DataLakeMeasurementCounter;
import org.apache.streampipes.model.datalake.AggregationFunction;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class DataLakeMeasurementCounterInflux extends
DataLakeMeasurementCounter {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);
+
private static final String COUNT_FIELD = "count";
- public DataLakeMeasurementCounterInflux(List<DataLakeMeasure>
allMeasurements,
- List<String> measurementNames) {
+ public DataLakeMeasurementCounterInflux(
+ List<DataLakeMeasure> allMeasurements,
+ List<String> measurementNames
+ ) {
super(allMeasurements, measurementNames);
}
@@ -39,15 +46,21 @@ public class DataLakeMeasurementCounterInflux extends
DataLakeMeasurementCounter
protected CompletableFuture<Integer>
createQueryAsAsyncFuture(DataLakeMeasure measure) {
return CompletableFuture.supplyAsync(() -> {
var firstColumn = getFirstMeasurementProperty(measure);
+ if (firstColumn == null) {
+ LOG.error(
+ "Could not count events in measurement: {}, because no measurement
property was found in event schema",
+ measure.getMeasureName()
+ );
+ return 0;
+ }
+
var builder = DataLakeInfluxQueryBuilder
-
.create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
+ .create(measure.getMeasureName())
+ .withEndTime(System.currentTimeMillis())
.withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
var queryResult = new
DataExplorerInfluxQueryExecutor().executeQuery(builder.build(),
Optional.empty(), true);
- if (queryResult.getTotal() > 0) {
- return extractResult(queryResult, COUNT_FIELD);
- } else {
- return 0;
- }
+
+ return queryResult.getTotal() > 0 ? extractResult(queryResult,
COUNT_FIELD) : 0;
});
}
}
diff --git
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
index 844ce5b573..bc3a551ed7 100644
---
a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
+++
b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
@@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements
IDataLakeMeasurement
protected final List<DataLakeMeasure> allMeasurements;
protected final List<String> measurementNames;
- public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
- List<String> measurementNames) {
+ public DataLakeMeasurementCounter(
+ List<DataLakeMeasure> allMeasurements,
+ List<String> measurementNames
+ ) {
this.allMeasurements = allMeasurements;
this.measurementNames = measurementNames;
}
@@ -52,12 +54,14 @@ public abstract class DataLakeMeasurementCounter implements
IDataLakeMeasurement
public Map<String, Integer> countMeasurementSizes() {
// create async futures so that count queries can be executed parallel
- Map<String, CompletableFuture<Integer>> countQueriesFutures =
measurementNames.stream()
+ Map<String, CompletableFuture<Integer>> countQueriesFutures =
measurementNames
+ .stream()
.map(this::getMeasure)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
- DataLakeMeasure::getMeasureName,
- this::createQueryAsAsyncFuture)
+ DataLakeMeasure::getMeasureName,
+ this::createQueryAsAsyncFuture
+ )
);
return getQueryResults(countQueriesFutures);
@@ -72,7 +76,8 @@ public abstract class DataLakeMeasurementCounter implements
IDataLakeMeasurement
private DataLakeMeasure getMeasure(String measureName) {
return allMeasurements
.stream()
- .filter(m -> m.getMeasureName().equals(measureName))
+ .filter(m -> m.getMeasureName()
+ .equals(measureName))
.findFirst()
.orElse(null);
}
@@ -83,7 +88,7 @@ public abstract class DataLakeMeasurementCounter implements
IDataLakeMeasurement
* @param queryFutures A Map containing the futures of
* asynchronous count queries mapped by their respective
keys.
* @return A Map representing the results of the queries, where each key
corresponds to
- * a measure name and the value is the count result.
+ * a measure name and the value is the count result.
*/
private Map<String, Integer> getQueryResults(Map<String,
CompletableFuture<Integer>> queryFutures) {
Map<String, Integer> resultPerMeasure = new HashMap<>();
@@ -106,18 +111,34 @@ public abstract class DataLakeMeasurementCounter
implements IDataLakeMeasurement
* @return The runtime name of the first measurement property, or null if no
such property is found.
*/
protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
- return measure.getEventSchema().getEventProperties()
+ var propertyRuntimeName = measure
+ .getEventSchema()
+ .getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope() != null
- &&
ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
+ && ep.getPropertyScope()
+ .equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.findFirst()
.orElse(null);
+
+ if (propertyRuntimeName == null) {
+ LOG.error("No measurement property was found in the event schema found
for measure {}", measure.getMeasureName());
+ }
+
+ return propertyRuntimeName;
}
protected Integer extractResult(SpQueryResult queryResult, String fieldName)
{
- return ((Double) (
-
queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
+ return (
+ (Double) (
+ queryResult.getAllDataSeries()
+ .get(0)
+ .getRows()
+ .get(0)
+ .get(queryResult.getHeaders()
+ .indexOf(fieldName))
+ )
).intValue();
}