This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a8ef70cd5 2354 harmonize influx store and influx db client (#3154)
9a8ef70cd5 is described below
commit 9a8ef70cd530b332937762307991c49cee713a81
Author: Isaak Krut <[email protected]>
AuthorDate: Tue Aug 27 12:44:10 2024 -0400
2354 harmonize influx store and influx db client (#3154)
* #2354 refactored both InfluxDbClient classes and TimeSeriesStorageInflux
* #2354 cleanup
* #2354 cleanup
* #2354 cleanup
* #2354 cleanup
---
.../influx/TimeSeriesStorageInflux.java | 2 +-
.../influx/client/InfluxClientProvider.java | 75 +++++++++++++++++-----
.../influx/TimeSeriesStorageInfluxTest.java | 3 +-
.../connectors/influx/adapter/InfluxDbClient.java | 2 +-
.../influx/shared/SharedInfluxClient.java | 15 +++--
.../connectors/influx/sink/InfluxDbClient.java | 24 +------
.../connectors/influx/sink/InfluxDbSink.java | 2 +-
7 files changed, 73 insertions(+), 50 deletions(-)
diff --git
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
index 8c2f674a99..fa35cbe9b9 100644
---
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
+++
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
@@ -45,7 +45,7 @@ public class TimeSeriesStorageInflux extends
TimeSeriesStorage {
InfluxClientProvider influxClientProvider
) throws SpRuntimeException {
super(measure);
- influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+ this.influxDb = influxClientProvider.getSetUpInfluxDBClient(environment);
propertyHandler = new PropertyHandler();
}
diff --git
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
index f1b47285b4..9f23cee0a6 100644
---
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
+++
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
@@ -34,8 +34,50 @@ import java.util.concurrent.TimeUnit;
public class InfluxClientProvider {
+ private static final int DEFAULT_BATCH_SIZE = 2000;
+ private static final int DEFAULT_FLUSH_DURATION = 500;
+
private static final Logger LOG =
LoggerFactory.getLogger(InfluxClientProvider.class);
+ /**
+ * Create a new InfluxDB client from Environment and ensures database is
available
+ * @param environment Environment
+ * @return InfluxDB
+ */
+ public InfluxDB getSetUpInfluxDBClient(Environment environment){
+ return getSetUpInfluxDBClient(InfluxConnectionSettings.from(environment));
+ }
+
+ /**
+ * Create a new InfluxDB client from Connection Settings and ensures
database is available
+ * @param settings Connection Settings
+ * @return InfluxDB
+ */
+ public InfluxDB getSetUpInfluxDBClient(InfluxConnectionSettings settings){
+ var influxDb = getInitializedInfluxDBClient(settings);
+ this.setupDatabaseAndBatching(influxDb, settings.getDatabaseName());
+
+ return influxDb;
+ }
+
+ /**
+ * Create a new InfluxDB client from provided settings and verify it's
available
+ * @param settings Connection settings
+ * @return InfluxDB
+ */
+ public InfluxDB getInitializedInfluxDBClient(InfluxConnectionSettings
settings){
+ var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+
+ // Checking, if server is available
+ var response = influxDb.ping();
+ if (response.getVersion()
+ .equalsIgnoreCase("unknown")) {
+ throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
+ }
+
+ return influxDb;
+ }
+
/**
* Create a new InfluxDB client from environment variables
*
@@ -68,20 +110,23 @@ public class InfluxClientProvider {
}
}
+ /**
+ * Creates the specified database in the influxDb instance if it does not
exist. Enables batching with default values
+ * @param influxDb The InfluxDB client instance
+ * @param databaseName The name of the database
+ */
+ public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName)
{
+ this.setupDatabaseAndBatching(influxDb, databaseName, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSH_DURATION);
+ }
- public InfluxDB getInitializedInfluxDBClient(Environment environment) {
-
- var settings = InfluxConnectionSettings.from(environment);
- var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
- var databaseName = settings.getDatabaseName();
-
- // Checking, if server is available
- var response = influxDb.ping();
- if (response.getVersion()
- .equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
settings.getConnectionUrl());
- }
-
+ /**
+ * Creates the specified database in the influxDb instance if it does not
exist. Enables batching
+ * @param influxDb The InfluxDB client instance
+ * @param databaseName The name of the database
+ * @param batchSize Batch Size
+ * @param flushDuration Flush Duration
+ */
+ public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName,
int batchSize, int flushDuration) {
// Checking whether the database exists
if (!databaseExists(influxDb, databaseName)) {
LOG.info("Database '" + databaseName + "' not found. Gets created ...");
@@ -90,11 +135,7 @@ public class InfluxClientProvider {
// setting up the database
influxDb.setDatabase(databaseName);
- var batchSize = 2000;
- var flushDuration = 500;
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
-
- return influxDb;
}
/**
diff --git
a/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
index 3ddd6f7007..d8e95bb1b0 100644
---
a/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
+++
b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.dataexplorer.influx;
+import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -383,7 +384,7 @@ public class TimeSeriesStorageInfluxTest {
);
var influxClientProviderMock = Mockito.mock(InfluxClientProvider.class);
-
Mockito.when(influxClientProviderMock.getInitializedInfluxDBClient(ArgumentMatchers.any()))
+ Mockito.when(influxClientProviderMock.getSetUpInfluxDBClient((Environment)
ArgumentMatchers.any()))
.thenReturn(influxDBMock);
return new TimeSeriesStorageInflux(measure, null,
influxClientProviderMock);
diff --git
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
index d23af15257..6c83056cd4 100644
---
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
+++
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
@@ -109,7 +109,7 @@ public class InfluxDbClient extends SharedInfluxClient {
public void disconnect() {
if (connected) {
- influxDb.close();
+ super.disconnect();
connected = false;
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
index c455f31aa9..69a3bb50ba 100644
---
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
+++
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
@@ -23,7 +23,6 @@ import
org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import
org.apache.streampipes.dataexplorer.influx.client.InfluxConnectionSettings;
import org.influxdb.InfluxDB;
-import org.influxdb.dto.Pong;
public abstract class SharedInfluxClient {
@@ -42,12 +41,14 @@ public abstract class SharedInfluxClient {
protected void initClient() throws SpRuntimeException {
- this.influxDb = InfluxClientProvider.getInfluxDBClient(connectionSettings);
+ InfluxClientProvider influxClientProvider = new InfluxClientProvider();
+ this.influxDb =
influxClientProvider.getInitializedInfluxDBClient(connectionSettings);
+ }
- // Checking, if server is available
- Pong response = influxDb.ping();
- if (response.getVersion().equalsIgnoreCase("unknown")) {
- throw new SpRuntimeException("Could not connect to InfluxDb Server: " +
connectionSettings.getConnectionUrl());
- }
+ /**
+ * Shuts down the connection to the InfluxDB server
+ */
+ public void disconnect() {
+ influxDb.close();
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
index 9c6f49f411..2a33f2cd41 100644
---
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
+++
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
@@ -24,17 +24,13 @@ import
org.apache.streampipes.dataexplorer.influx.client.InfluxConnectionSetting
import
org.apache.streampipes.extensions.connectors.influx.shared.SharedInfluxClient;
import org.apache.streampipes.model.runtime.Event;
-import org.influxdb.BatchOptions;
import org.influxdb.dto.Point;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InfluxDbClient extends SharedInfluxClient {
- private static final Logger LOG =
LoggerFactory.getLogger(InfluxDbClient.class);
private final String timestampField;
private final Integer batchSize;
@@ -66,17 +62,8 @@ public class InfluxDbClient extends SharedInfluxClient {
*/
private void connect() throws SpRuntimeException {
super.initClient();
- var databaseName = connectionSettings.getDatabaseName();
-
- // Checking whether the database exists
- if (!influxClientProvider.databaseExists(influxDb, databaseName)) {
- LOG.info("Database '" + databaseName + "' not found. Gets created ...");
- influxClientProvider.createDatabase(influxDb, databaseName);
- }
-
- // setting up the database
- influxDb.setDatabase(databaseName);
-
influxDb.enableBatch(BatchOptions.DEFAULTS.actions(batchSize).flushDuration(flushDuration));
+ influxClientProvider.setupDatabaseAndBatching(
+ influxDb, connectionSettings.getDatabaseName(), batchSize,
flushDuration);
}
/**
@@ -107,11 +94,4 @@ public class InfluxDbClient extends SharedInfluxClient {
influxDb.write(p.build());
}
-
- /**
- * Shuts down the connection to the InfluxDB server
- */
- void stop() {
- influxDb.close();
- }
}
diff --git
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
index 18f3d2fb54..3da9bc629c 100644
---
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
+++
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
@@ -83,7 +83,7 @@ public class InfluxDbSink extends StreamPipesDataSink {
@Override
public void onDetach() throws SpRuntimeException {
- influxDbClient.stop();
+ influxDbClient.disconnect();
}
public static String prepareString(String s) {