bossenti commented on code in PR #2247:
URL: https://github.com/apache/streampipes/pull/2247#discussion_r1411665411
##########
streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java:
##########
@@ -67,7 +70,7 @@ private void connect() throws SpRuntimeException {
var databaseName = connectionSettings.getDatabaseName();
// Checking whether the database exists
- if (!InfluxRequests.databaseExists(influxDb, databaseName)) {
+ if (!influxClientProvider.databaseExists(influxDb, databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
createDatabase(databaseName);
Review Comment:
Can we use `createDatabase()` from the InfluxClientProvider here?
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java:
##########
@@ -58,7 +67,80 @@ public static InfluxDB
getInfluxDBClient(InfluxConnectionSettings settings) {
}
}
+
+ public InfluxDB getInitializedInfluxDBClient(Environment environment) {
+
+ var settings = InfluxConnectionSettings.from(environment);
+ var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ var databaseName = settings.getDatabaseName();
+
+ // Checking, if server is available
+ var response = influxDb.ping();
+ if (response.getVersion()
+ .equalsIgnoreCase("unknown")) {
+ throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
+ }
+
+ // Checking whether the database exists
+ if (!databaseExists(influxDb, databaseName)) {
+ LOG.info("Database '" + databaseName + "' not found. Gets created ...");
+ createDatabase(influxDb, databaseName);
+ }
+
+ // setting up the database
+ influxDb.setDatabase(databaseName);
+ var batchSize = 2000;
+ var flushDuration = 500;
+ influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+
+ return influxDb;
+ }
+
+ /**
+ * Creates a new database with the given name
+ *
+ * @param influxDb The InfluxDB client
+ * @param dbName The name of the database which should be created
+ */
+ public void createDatabase(
+ InfluxDB influxDb,
+ String dbName
+ ) throws SpRuntimeException {
+ if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
+ throw new SpRuntimeException(
+ "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ }
+ influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
+ }
+
+ /**
+ * Checks whether the given database exists.
+ *
+ * @param influxDb The InfluxDB client instance
+ * @param dbName The name of the database, the method should look for
+ * @return True if the database exists, false otherwise
+ */
+ public boolean databaseExists(
+ InfluxDB influxDb,
+ String dbName
+ ) {
+ var queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
+ for (List<Object> a : queryResult.getResults()
+ .get(0)
+ .getSeries()
+ .get(0)
+ .getValues()) {
+ if (a.size() > 0 && dbName.equals(a.get(0))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
private static Environment getEnvironment() {
return Environments.getEnvironment();
}
+
Review Comment:
```suggestion
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
- private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
+ private PropertyHandler propertyHandler;
- public InfluxStore(DataLakeMeasure measure,
- InfluxConnectionSettings settings) {
- this.measure = measure;
- // store sanitized target property runtime names in local variable
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> sanitizedRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
-
- connect(settings);
- }
- public InfluxStore(DataLakeMeasure measure,
- Environment environment) throws SpRuntimeException {
- this(measure, InfluxConnectionSettings.from(environment));
+ public InfluxStore(
+ DataLakeMeasure measure,
+ Environment environment,
+ InfluxClientProvider influxClientProvider
+ ) throws SpRuntimeException {
+ this.measure = measure;
+ storeSanitizedRuntimeNames(measure);
+ this.allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
+ influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+ propertyHandler = new PropertyHandler();
}
/**
- * Connects to the InfluxDB Server, sets the database and initializes the
batch-behaviour
+ * Takes an StreamPipes event, transforms it to an InfluxDB point and writes
it to the InfluxDB
*
- * @throws SpRuntimeException If not connection can be established or if the
database could not
- * be found
+ * @param event The event which should be saved
+ * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
*/
- private void connect(InfluxConnectionSettings settings) throws
SpRuntimeException {
- influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ public void onEvent(Event event) throws SpRuntimeException {
- // Checking, if server is available
- var response = influxDb.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
- }
+ validateInputEventAndLogMissingFields(event);
- String databaseName = settings.getDatabaseName();
- // Checking whether the database exists
- if (!InfluxRequests.databaseExists(influxDb, databaseName)) {
- LOG.info("Database '" + databaseName + "' not found. Gets created ...");
- createDatabase(databaseName);
- }
+ sanitizeRuntimeNamesInEvent(event);
+
+ var point = initializePointWithTimestamp(event);
+
+ iterateOverallEventProperties(event, point);
+
+ influxDb.write(point.build());
+ }
+
+ private void validateInputEventAndLogMissingFields(Event event) {
+ checkEventIsNotNull(event);
+
+ logMissingFields(event);
- // setting up the database
- influxDb.setDatabase(databaseName);
- var batchSize = 2000;
- var flushDuration = 500;
- influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+ logNullFields(event);
}
/**
- * Creates a new database with the given name
- *
- * @param dbName The name of the database which should be created
+ * Logs all fields which are present in the schema, but not in the provided
event
*/
- private void createDatabase(String dbName) throws SpRuntimeException {
- if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
- throw new SpRuntimeException(
- "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ private void logMissingFields(Event event) {
+ var missingFields = getMissingProperties(allEventProperties, event);
+ if (missingFields.size() > 0) {
+ LOG.debug(
+ "Ignored {} fields which were present in the schema, but not in the
provided event: {}",
+ missingFields.size(),
+ String.join(", ", missingFields)
+ );
}
- influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}
+
/**
- * Saves an event to the connected InfluxDB database
- *
- * @param event The event which should be saved
- * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
+ * Logs all fields that contain null values
*/
- public void onEvent(Event event) throws SpRuntimeException {
- var missingFields = new ArrayList<String>();
- var nullFields = new ArrayList<String>();
- if (event == null) {
- throw new SpRuntimeException("event is null");
- }
+ private void logNullFields(Event event) {
+ List<String> nullFields = allEventProperties
+ .stream()
+ .filter(ep -> ep instanceof EventPropertyPrimitive)
+ .filter(ep -> {
+ var runtimeName = ep.getRuntimeName();
+ var field = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ return field.isPresent() && field.get()
+ .getAsPrimitive()
+ .getRawValue() == null;
+ })
+ .map(EventProperty::getRuntimeName)
+ .collect(Collectors.toList());
- // sanitize event
- for (var key : event.getRaw().keySet()) {
- if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k ->
k.equalsIgnoreCase(key))) {
- event.renameFieldByRuntimeName(key, key + "_");
- }
+ if (nullFields.size() > 0) {
Review Comment:
```suggestion
if (!nullFields.isEmpty()) {
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
- private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
+ private PropertyHandler propertyHandler;
- public InfluxStore(DataLakeMeasure measure,
- InfluxConnectionSettings settings) {
- this.measure = measure;
- // store sanitized target property runtime names in local variable
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> sanitizedRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
-
- connect(settings);
- }
- public InfluxStore(DataLakeMeasure measure,
- Environment environment) throws SpRuntimeException {
- this(measure, InfluxConnectionSettings.from(environment));
+ public InfluxStore(
+ DataLakeMeasure measure,
+ Environment environment,
+ InfluxClientProvider influxClientProvider
+ ) throws SpRuntimeException {
+ this.measure = measure;
+ storeSanitizedRuntimeNames(measure);
+ this.allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
+ influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+ propertyHandler = new PropertyHandler();
}
/**
- * Connects to the InfluxDB Server, sets the database and initializes the
batch-behaviour
+ * Takes an StreamPipes event, transforms it to an InfluxDB point and writes
it to the InfluxDB
*
- * @throws SpRuntimeException If not connection can be established or if the
database could not
- * be found
+ * @param event The event which should be saved
+ * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
*/
- private void connect(InfluxConnectionSettings settings) throws
SpRuntimeException {
- influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ public void onEvent(Event event) throws SpRuntimeException {
- // Checking, if server is available
- var response = influxDb.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
- }
+ validateInputEventAndLogMissingFields(event);
- String databaseName = settings.getDatabaseName();
- // Checking whether the database exists
- if (!InfluxRequests.databaseExists(influxDb, databaseName)) {
- LOG.info("Database '" + databaseName + "' not found. Gets created ...");
- createDatabase(databaseName);
- }
+ sanitizeRuntimeNamesInEvent(event);
+
+ var point = initializePointWithTimestamp(event);
+
+ iterateOverallEventProperties(event, point);
+
+ influxDb.write(point.build());
+ }
+
+ private void validateInputEventAndLogMissingFields(Event event) {
+ checkEventIsNotNull(event);
+
+ logMissingFields(event);
- // setting up the database
- influxDb.setDatabase(databaseName);
- var batchSize = 2000;
- var flushDuration = 500;
- influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+ logNullFields(event);
}
/**
- * Creates a new database with the given name
- *
- * @param dbName The name of the database which should be created
+ * Logs all fields which are present in the schema, but not in the provided
event
*/
- private void createDatabase(String dbName) throws SpRuntimeException {
- if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
- throw new SpRuntimeException(
- "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ private void logMissingFields(Event event) {
+ var missingFields = getMissingProperties(allEventProperties, event);
+ if (missingFields.size() > 0) {
+ LOG.debug(
+ "Ignored {} fields which were present in the schema, but not in the
provided event: {}",
+ missingFields.size(),
+ String.join(", ", missingFields)
+ );
}
- influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}
+
/**
- * Saves an event to the connected InfluxDB database
- *
- * @param event The event which should be saved
- * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
+ * Logs all fields that contain null values
*/
- public void onEvent(Event event) throws SpRuntimeException {
- var missingFields = new ArrayList<String>();
- var nullFields = new ArrayList<String>();
- if (event == null) {
- throw new SpRuntimeException("event is null");
- }
+ private void logNullFields(Event event) {
+ List<String> nullFields = allEventProperties
+ .stream()
+ .filter(ep -> ep instanceof EventPropertyPrimitive)
+ .filter(ep -> {
+ var runtimeName = ep.getRuntimeName();
+ var field = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ return field.isPresent() && field.get()
+ .getAsPrimitive()
+ .getRawValue() == null;
+ })
+ .map(EventProperty::getRuntimeName)
+ .collect(Collectors.toList());
- // sanitize event
- for (var key : event.getRaw().keySet()) {
- if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k ->
k.equalsIgnoreCase(key))) {
- event.renameFieldByRuntimeName(key, key + "_");
- }
+ if (nullFields.size() > 0) {
+ LOG.warn("Ignored {} fields which had a value 'null': {}",
nullFields.size(), String.join(", ", nullFields));
}
+ }
- var timestampValue =
event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
- var point =
- Point.measurement(measure.getMeasureName()).time((long)
timestampValue, TimeUnit.MILLISECONDS);
+ private void iterateOverallEventProperties(
+ Event event,
+ Point.Builder point
+ ) {
- for (var ep : measure.getEventSchema().getEventProperties()) {
+ allEventProperties.forEach(ep -> {
var runtimeName = ep.getRuntimeName();
- // timestamp should not be added as a field
- if (!measure.getTimestampField().endsWith(runtimeName)) {
- var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
- var field = event.getOptionalFieldByRuntimeName(runtimeName);
- try {
- if (ep instanceof EventPropertyPrimitive) {
- if (field.isPresent()) {
- var eventPropertyPrimitiveField = field.get().getAsPrimitive();
- if (eventPropertyPrimitiveField.getRawValue() == null) {
- nullFields.add(sanitizedRuntimeName);
- } else {
-
- // store property as tag when the field is a dimension property
- if
(PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
- point.tag(sanitizedRuntimeName,
eventPropertyPrimitiveField.getAsString());
- } else {
- handleMeasurementProperty(
- point,
- (EventPropertyPrimitive) ep,
- sanitizedRuntimeName,
- eventPropertyPrimitiveField);
- }
- }
- } else {
- missingFields.add(runtimeName);
- }
- } else {
- // Since InfluxDB can't store non-primitive types, store them as
string
- // and deserialize later in downstream processes
- if (field.isPresent()) {
- handleNonPrimitiveMeasurementProperty(point, event,
sanitizedRuntimeName);
- } else {
- missingFields.add(runtimeName);
- }
- }
- } catch (SpRuntimeException iae) {
- LOG.warn("Runtime exception while extracting field value of field {}
- this field will be ignored",
- runtimeName, iae);
+ var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
+ var fieldOptional = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ fieldOptional.ifPresent(field -> {
+ if (ep instanceof EventPropertyPrimitive) {
+ propertyHandler.handlePrimitiveProperty(
+ point,
+ (EventPropertyPrimitive) ep,
+ field.getAsPrimitive(),
+ sanitizedRuntimeName
+ );
+ } else {
+ propertyHandler.handleNonPrimitiveProperty(
+ point,
+ event,
+ sanitizedRuntimeName
+ );
}
- }
- }
-
- if (missingFields.size() > 0) {
- LOG.debug("Ignored {} fields which were present in the schema, but not
in the provided event: {}",
- missingFields.size(),
- String.join(", ", missingFields));
- }
+ });
+ });
+ }
- if (nullFields.size() > 0) {
- LOG.warn("Ignored {} fields which had a value 'null': {}",
nullFields.size(), String.join(", ", nullFields));
- }
- influxDb.write(point.build());
+ /**
+ * Returns a list of the runtime names that are missing within the event
+ */
+ private List<String> getMissingProperties(
+ List<EventProperty> allEventProperties,
+ Event event
+ ) {
+ return allEventProperties.stream()
+ .map(EventProperty::getRuntimeName)
+ .filter(runtimeName ->
!event.getOptionalFieldByRuntimeName(runtimeName)
+ .isPresent())
Review Comment:
```suggestion
.isEmpty())
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
- private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
+ private PropertyHandler propertyHandler;
- public InfluxStore(DataLakeMeasure measure,
- InfluxConnectionSettings settings) {
- this.measure = measure;
- // store sanitized target property runtime names in local variable
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> sanitizedRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
-
- connect(settings);
- }
- public InfluxStore(DataLakeMeasure measure,
- Environment environment) throws SpRuntimeException {
- this(measure, InfluxConnectionSettings.from(environment));
+ public InfluxStore(
+ DataLakeMeasure measure,
+ Environment environment,
+ InfluxClientProvider influxClientProvider
+ ) throws SpRuntimeException {
+ this.measure = measure;
+ storeSanitizedRuntimeNames(measure);
+ this.allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
+ influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+ propertyHandler = new PropertyHandler();
}
/**
- * Connects to the InfluxDB Server, sets the database and initializes the
batch-behaviour
+ * Takes an StreamPipes event, transforms it to an InfluxDB point and writes
it to the InfluxDB
*
- * @throws SpRuntimeException If not connection can be established or if the
database could not
- * be found
+ * @param event The event which should be saved
+ * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
*/
- private void connect(InfluxConnectionSettings settings) throws
SpRuntimeException {
- influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ public void onEvent(Event event) throws SpRuntimeException {
- // Checking, if server is available
- var response = influxDb.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
- }
+ validateInputEventAndLogMissingFields(event);
- String databaseName = settings.getDatabaseName();
- // Checking whether the database exists
- if (!InfluxRequests.databaseExists(influxDb, databaseName)) {
- LOG.info("Database '" + databaseName + "' not found. Gets created ...");
- createDatabase(databaseName);
- }
+ sanitizeRuntimeNamesInEvent(event);
+
+ var point = initializePointWithTimestamp(event);
+
+ iterateOverallEventProperties(event, point);
+
+ influxDb.write(point.build());
+ }
+
+ private void validateInputEventAndLogMissingFields(Event event) {
+ checkEventIsNotNull(event);
+
+ logMissingFields(event);
- // setting up the database
- influxDb.setDatabase(databaseName);
- var batchSize = 2000;
- var flushDuration = 500;
- influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+ logNullFields(event);
}
/**
- * Creates a new database with the given name
- *
- * @param dbName The name of the database which should be created
+ * Logs all fields which are present in the schema, but not in the provided
event
*/
- private void createDatabase(String dbName) throws SpRuntimeException {
- if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
- throw new SpRuntimeException(
- "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ private void logMissingFields(Event event) {
+ var missingFields = getMissingProperties(allEventProperties, event);
+ if (missingFields.size() > 0) {
Review Comment:
```suggestion
if (!missingFields.isEmpty()) {
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxClientProvider.java:
##########
@@ -58,7 +67,80 @@ public static InfluxDB
getInfluxDBClient(InfluxConnectionSettings settings) {
}
}
+
+ public InfluxDB getInitializedInfluxDBClient(Environment environment) {
+
+ var settings = InfluxConnectionSettings.from(environment);
+ var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ var databaseName = settings.getDatabaseName();
+
+ // Checking, if server is available
+ var response = influxDb.ping();
+ if (response.getVersion()
+ .equalsIgnoreCase("unknown")) {
+ throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
+ }
+
+ // Checking whether the database exists
+ if (!databaseExists(influxDb, databaseName)) {
+ LOG.info("Database '" + databaseName + "' not found. Gets created ...");
+ createDatabase(influxDb, databaseName);
+ }
+
+ // setting up the database
+ influxDb.setDatabase(databaseName);
+ var batchSize = 2000;
+ var flushDuration = 500;
+ influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+
+ return influxDb;
+ }
+
+ /**
+ * Creates a new database with the given name
+ *
+ * @param influxDb The InfluxDB client
+ * @param dbName The name of the database which should be created
+ */
+ public void createDatabase(
+ InfluxDB influxDb,
+ String dbName
+ ) throws SpRuntimeException {
+ if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
+ throw new SpRuntimeException(
+ "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ }
+ influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
+ }
+
+ /**
+ * Checks whether the given database exists.
+ *
+ * @param influxDb The InfluxDB client instance
+ * @param dbName The name of the database, the method should look for
+ * @return True if the database exists, false otherwise
+ */
+ public boolean databaseExists(
+ InfluxDB influxDb,
+ String dbName
+ ) {
+ var queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
+ for (List<Object> a : queryResult.getResults()
+ .get(0)
+ .getSeries()
+ .get(0)
+ .getValues()) {
+ if (a.size() > 0 && dbName.equals(a.get(0))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
private static Environment getEnvironment() {
return Environments.getEnvironment();
}
+
+
Review Comment:
```suggestion
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
- private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
+ private PropertyHandler propertyHandler;
- public InfluxStore(DataLakeMeasure measure,
- InfluxConnectionSettings settings) {
- this.measure = measure;
- // store sanitized target property runtime names in local variable
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> sanitizedRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
-
- connect(settings);
- }
- public InfluxStore(DataLakeMeasure measure,
- Environment environment) throws SpRuntimeException {
- this(measure, InfluxConnectionSettings.from(environment));
+ public InfluxStore(
+ DataLakeMeasure measure,
+ Environment environment,
+ InfluxClientProvider influxClientProvider
+ ) throws SpRuntimeException {
+ this.measure = measure;
+ storeSanitizedRuntimeNames(measure);
+ this.allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
Review Comment:
```suggestion
allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
Review Comment:
```suggestion
private final InfluxDB influxDb;
```
##########
streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java:
##########
@@ -20,219 +20,161 @@
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import
org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
+import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
-import org.influxdb.dto.Query;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- DataLakeMeasure measure;
- Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+
+ private DataLakeMeasure measure;
+ private List<EventProperty> allEventProperties;
+ private Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;
- private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
+ private PropertyHandler propertyHandler;
- public InfluxStore(DataLakeMeasure measure,
- InfluxConnectionSettings settings) {
- this.measure = measure;
- // store sanitized target property runtime names in local variable
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> sanitizedRuntimeNames.put(ep.getRuntimeName(),
- InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
-
- connect(settings);
- }
- public InfluxStore(DataLakeMeasure measure,
- Environment environment) throws SpRuntimeException {
- this(measure, InfluxConnectionSettings.from(environment));
+ public InfluxStore(
+ DataLakeMeasure measure,
+ Environment environment,
+ InfluxClientProvider influxClientProvider
+ ) throws SpRuntimeException {
+ this.measure = measure;
+ storeSanitizedRuntimeNames(measure);
+ this.allEventProperties = getAllEventPropertiesExceptTimestamp(measure);
+ influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+ propertyHandler = new PropertyHandler();
}
/**
- * Connects to the InfluxDB Server, sets the database and initializes the
batch-behaviour
+ * Takes an StreamPipes event, transforms it to an InfluxDB point and writes
it to the InfluxDB
*
- * @throws SpRuntimeException If not connection can be established or if the
database could not
- * be found
+ * @param event The event which should be saved
+ * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
*/
- private void connect(InfluxConnectionSettings settings) throws
SpRuntimeException {
- influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+ public void onEvent(Event event) throws SpRuntimeException {
- // Checking, if server is available
- var response = influxDb.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
- }
+ validateInputEventAndLogMissingFields(event);
- String databaseName = settings.getDatabaseName();
- // Checking whether the database exists
- if (!InfluxRequests.databaseExists(influxDb, databaseName)) {
- LOG.info("Database '" + databaseName + "' not found. Gets created ...");
- createDatabase(databaseName);
- }
+ sanitizeRuntimeNamesInEvent(event);
+
+ var point = initializePointWithTimestamp(event);
+
+ iterateOverallEventProperties(event, point);
+
+ influxDb.write(point.build());
+ }
+
+ private void validateInputEventAndLogMissingFields(Event event) {
+ checkEventIsNotNull(event);
+
+ logMissingFields(event);
- // setting up the database
- influxDb.setDatabase(databaseName);
- var batchSize = 2000;
- var flushDuration = 500;
- influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+ logNullFields(event);
}
/**
- * Creates a new database with the given name
- *
- * @param dbName The name of the database which should be created
+ * Logs all fields which are present in the schema, but not in the provided
event
*/
- private void createDatabase(String dbName) throws SpRuntimeException {
- if (!dbName.matches("^[a-zA-Z_]\\w*$")) {
- throw new SpRuntimeException(
- "Database name '" + dbName + "' not allowed. Allowed names:
^[a-zA-Z_][a-zA-Z0-9_]*$");
+ private void logMissingFields(Event event) {
+ var missingFields = getMissingProperties(allEventProperties, event);
+ if (missingFields.size() > 0) {
+ LOG.debug(
+ "Ignored {} fields which were present in the schema, but not in the
provided event: {}",
+ missingFields.size(),
+ String.join(", ", missingFields)
+ );
}
- influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}
+
/**
- * Saves an event to the connected InfluxDB database
- *
- * @param event The event which should be saved
- * @throws SpRuntimeException If the column name (key-value of the event
map) is not allowed
+ * Logs all fields that contain null values
*/
- public void onEvent(Event event) throws SpRuntimeException {
- var missingFields = new ArrayList<String>();
- var nullFields = new ArrayList<String>();
- if (event == null) {
- throw new SpRuntimeException("event is null");
- }
+ private void logNullFields(Event event) {
+ List<String> nullFields = allEventProperties
+ .stream()
+ .filter(ep -> ep instanceof EventPropertyPrimitive)
+ .filter(ep -> {
+ var runtimeName = ep.getRuntimeName();
+ var field = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ return field.isPresent() && field.get()
+ .getAsPrimitive()
+ .getRawValue() == null;
+ })
+ .map(EventProperty::getRuntimeName)
+ .collect(Collectors.toList());
- // sanitize event
- for (var key : event.getRaw().keySet()) {
- if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k ->
k.equalsIgnoreCase(key))) {
- event.renameFieldByRuntimeName(key, key + "_");
- }
+ if (nullFields.size() > 0) {
+ LOG.warn("Ignored {} fields which had a value 'null': {}",
nullFields.size(), String.join(", ", nullFields));
}
+ }
- var timestampValue =
event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
- var point =
- Point.measurement(measure.getMeasureName()).time((long)
timestampValue, TimeUnit.MILLISECONDS);
+ private void iterateOverallEventProperties(
+ Event event,
+ Point.Builder point
+ ) {
- for (var ep : measure.getEventSchema().getEventProperties()) {
+ allEventProperties.forEach(ep -> {
var runtimeName = ep.getRuntimeName();
- // timestamp should not be added as a field
- if (!measure.getTimestampField().endsWith(runtimeName)) {
- var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
- var field = event.getOptionalFieldByRuntimeName(runtimeName);
- try {
- if (ep instanceof EventPropertyPrimitive) {
- if (field.isPresent()) {
- var eventPropertyPrimitiveField = field.get().getAsPrimitive();
- if (eventPropertyPrimitiveField.getRawValue() == null) {
- nullFields.add(sanitizedRuntimeName);
- } else {
-
- // store property as tag when the field is a dimension property
- if
(PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
- point.tag(sanitizedRuntimeName,
eventPropertyPrimitiveField.getAsString());
- } else {
- handleMeasurementProperty(
- point,
- (EventPropertyPrimitive) ep,
- sanitizedRuntimeName,
- eventPropertyPrimitiveField);
- }
- }
- } else {
- missingFields.add(runtimeName);
- }
- } else {
- // Since InfluxDB can't store non-primitive types, store them as
string
- // and deserialize later in downstream processes
- if (field.isPresent()) {
- handleNonPrimitiveMeasurementProperty(point, event,
sanitizedRuntimeName);
- } else {
- missingFields.add(runtimeName);
- }
- }
- } catch (SpRuntimeException iae) {
- LOG.warn("Runtime exception while extracting field value of field {}
- this field will be ignored",
- runtimeName, iae);
+ var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
+ var fieldOptional = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ fieldOptional.ifPresent(field -> {
+ if (ep instanceof EventPropertyPrimitive) {
+ propertyHandler.handlePrimitiveProperty(
+ point,
+ (EventPropertyPrimitive) ep,
+ field.getAsPrimitive(),
+ sanitizedRuntimeName
+ );
+ } else {
+ propertyHandler.handleNonPrimitiveProperty(
+ point,
+ event,
+ sanitizedRuntimeName
+ );
}
- }
- }
-
- if (missingFields.size() > 0) {
- LOG.debug("Ignored {} fields which were present in the schema, but not
in the provided event: {}",
- missingFields.size(),
- String.join(", ", missingFields));
- }
+ });
+ });
+ }
- if (nullFields.size() > 0) {
- LOG.warn("Ignored {} fields which had a value 'null': {}",
nullFields.size(), String.join(", ", nullFields));
- }
- influxDb.write(point.build());
+ /**
+ * Returns a list of the runtime names that are missing within the event
+ */
+ private List<String> getMissingProperties(
+ List<EventProperty> allEventProperties,
+ Event event
+ ) {
+ return allEventProperties.stream()
+ .map(EventProperty::getRuntimeName)
+ .filter(runtimeName ->
!event.getOptionalFieldByRuntimeName(runtimeName)
Review Comment:
```suggestion
.filter(runtimeName ->
event.getOptionalFieldByRuntimeName(runtimeName)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]