This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9a8ef70cd5 2354 harmonize influx store and influx db client (#3154)
9a8ef70cd5 is described below

commit 9a8ef70cd530b332937762307991c49cee713a81
Author: Isaak Krut <[email protected]>
AuthorDate: Tue Aug 27 12:44:10 2024 -0400

    2354 harmonize influx store and influx db client (#3154)
    
    * #2354 refactored both InfluxDbClient classes and TimeSeriesStorageInflux
    
    * #2354 cleanup
    
    * #2354 cleanup
    
    * #2354 cleanup
    
    * #2354 cleanup
---
 .../influx/TimeSeriesStorageInflux.java            |  2 +-
 .../influx/client/InfluxClientProvider.java        | 75 +++++++++++++++++-----
 .../influx/TimeSeriesStorageInfluxTest.java        |  3 +-
 .../connectors/influx/adapter/InfluxDbClient.java  |  2 +-
 .../influx/shared/SharedInfluxClient.java          | 15 +++--
 .../connectors/influx/sink/InfluxDbClient.java     | 24 +------
 .../connectors/influx/sink/InfluxDbSink.java       |  2 +-
 7 files changed, 73 insertions(+), 50 deletions(-)

diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
index 8c2f674a99..fa35cbe9b9 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInflux.java
@@ -45,7 +45,7 @@ public class TimeSeriesStorageInflux extends 
TimeSeriesStorage {
       InfluxClientProvider influxClientProvider
   ) throws SpRuntimeException {
     super(measure);
-    influxDb = influxClientProvider.getInitializedInfluxDBClient(environment);
+    this.influxDb = influxClientProvider.getSetUpInfluxDBClient(environment);
     propertyHandler = new PropertyHandler();
   }
 
diff --git 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
index f1b47285b4..9f23cee0a6 100644
--- 
a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
+++ 
b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/client/InfluxClientProvider.java
@@ -34,8 +34,50 @@ import java.util.concurrent.TimeUnit;
 
 public class InfluxClientProvider {
 
+  private static final int DEFAULT_BATCH_SIZE = 2000;
+  private static final int DEFAULT_FLUSH_DURATION = 500;
+
   private static final Logger LOG = 
LoggerFactory.getLogger(InfluxClientProvider.class);
 
+  /**
+   * Create a new InfluxDB client from Environment and ensures database is 
available
+   * @param environment Environment
+   * @return InfluxDB
+   */
+  public InfluxDB getSetUpInfluxDBClient(Environment environment){
+    return getSetUpInfluxDBClient(InfluxConnectionSettings.from(environment));
+  }
+
+  /**
+   * Create a new InfluxDB client from Connection Settings and ensures 
database is available
+   * @param settings Connection Settings
+   * @return InfluxDB
+   */
+  public InfluxDB getSetUpInfluxDBClient(InfluxConnectionSettings settings){
+    var influxDb = getInitializedInfluxDBClient(settings);
+    this.setupDatabaseAndBatching(influxDb, settings.getDatabaseName());
+
+    return influxDb;
+  }
+
+  /**
+   * Create a new InfluxDB client from provided settings and verify it's 
available
+   * @param settings Connection settings
+   * @return InfluxDB
+   */
+  public InfluxDB getInitializedInfluxDBClient(InfluxConnectionSettings 
settings){
+    var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
+
+    // Checking, if server is available
+    var response = influxDb.ping();
+    if (response.getVersion()
+        .equalsIgnoreCase("unknown")) {
+      throw new SpRuntimeException("Could not connect to InfluxDb Server: " + 
settings.getConnectionUrl());
+    }
+
+    return influxDb;
+  }
+
   /**
    * Create a new InfluxDB client from environment variables
    *
@@ -68,20 +110,23 @@ public class InfluxClientProvider {
     }
   }
 
+  /**
+   * Creates the specified database in the influxDb instance if it does not 
exist. Enables batching with default values
+   * @param influxDb The InfluxDB client instance
+   * @param databaseName The name of the database
+   */
+  public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName) 
{
+    this.setupDatabaseAndBatching(influxDb, databaseName, DEFAULT_BATCH_SIZE, 
DEFAULT_FLUSH_DURATION);
+  }
 
-  public InfluxDB getInitializedInfluxDBClient(Environment environment) {
-
-    var settings = InfluxConnectionSettings.from(environment);
-    var influxDb = InfluxClientProvider.getInfluxDBClient(settings);
-    var databaseName = settings.getDatabaseName();
-
-    // Checking, if server is available
-    var response = influxDb.ping();
-    if (response.getVersion()
-                .equalsIgnoreCase("unknown")) {
-      throw new SpRuntimeException("Could not connect to InfluxDb Server: " + 
settings.getConnectionUrl());
-    }
-
+  /**
+   * Creates the specified database in the influxDb instance if it does not 
exist. Enables batching
+   * @param influxDb The InfluxDB client instance
+   * @param databaseName The name of the database
+   * @param batchSize Batch Size
+   * @param flushDuration Flush Duration
+   */
+  public void setupDatabaseAndBatching(InfluxDB influxDb, String databaseName, 
int batchSize, int flushDuration) {
     // Checking whether the database exists
     if (!databaseExists(influxDb, databaseName)) {
       LOG.info("Database '" + databaseName + "' not found. Gets created ...");
@@ -90,11 +135,7 @@ public class InfluxClientProvider {
 
     // setting up the database
     influxDb.setDatabase(databaseName);
-    var batchSize = 2000;
-    var flushDuration = 500;
     influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
-
-    return influxDb;
   }
 
   /**
diff --git 
a/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
 
b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
index 3ddd6f7007..d8e95bb1b0 100644
--- 
a/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
+++ 
b/streampipes-data-explorer-influx/src/test/java/org/apache/streampipes/dataexplorer/influx/TimeSeriesStorageInfluxTest.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.dataexplorer.influx;
 
 
+import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -383,7 +384,7 @@ public class TimeSeriesStorageInfluxTest {
     );
 
     var influxClientProviderMock = Mockito.mock(InfluxClientProvider.class);
-    
Mockito.when(influxClientProviderMock.getInitializedInfluxDBClient(ArgumentMatchers.any()))
+    Mockito.when(influxClientProviderMock.getSetUpInfluxDBClient((Environment) 
ArgumentMatchers.any()))
            .thenReturn(influxDBMock);
 
     return new TimeSeriesStorageInflux(measure, null, 
influxClientProviderMock);
diff --git 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
index d23af15257..6c83056cd4 100644
--- 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
+++ 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/adapter/InfluxDbClient.java
@@ -109,7 +109,7 @@ public class InfluxDbClient extends SharedInfluxClient {
 
   public void disconnect() {
     if (connected) {
-      influxDb.close();
+      super.disconnect();
       connected = false;
     }
   }
diff --git 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
index c455f31aa9..69a3bb50ba 100644
--- 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
+++ 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/shared/SharedInfluxClient.java
@@ -23,7 +23,6 @@ import 
org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
 import 
org.apache.streampipes.dataexplorer.influx.client.InfluxConnectionSettings;
 
 import org.influxdb.InfluxDB;
-import org.influxdb.dto.Pong;
 
 public abstract class SharedInfluxClient {
 
@@ -42,12 +41,14 @@ public abstract class SharedInfluxClient {
 
 
   protected void initClient() throws SpRuntimeException {
-    this.influxDb = InfluxClientProvider.getInfluxDBClient(connectionSettings);
+    InfluxClientProvider influxClientProvider = new InfluxClientProvider();
+    this.influxDb = 
influxClientProvider.getInitializedInfluxDBClient(connectionSettings);
+  }
 
-    // Checking, if server is available
-    Pong response = influxDb.ping();
-    if (response.getVersion().equalsIgnoreCase("unknown")) {
-      throw new SpRuntimeException("Could not connect to InfluxDb Server: " + 
connectionSettings.getConnectionUrl());
-    }
+  /**
+   * Shuts down the connection to the InfluxDB server
+   */
+  public void disconnect() {
+    influxDb.close();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
index 9c6f49f411..2a33f2cd41 100644
--- 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
+++ 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbClient.java
@@ -24,17 +24,13 @@ import 
org.apache.streampipes.dataexplorer.influx.client.InfluxConnectionSetting
 import 
org.apache.streampipes.extensions.connectors.influx.shared.SharedInfluxClient;
 import org.apache.streampipes.model.runtime.Event;
 
-import org.influxdb.BatchOptions;
 import org.influxdb.dto.Point;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 public class InfluxDbClient extends SharedInfluxClient {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(InfluxDbClient.class);
 
   private final String timestampField;
   private final Integer batchSize;
@@ -66,17 +62,8 @@ public class InfluxDbClient extends SharedInfluxClient {
    */
   private void connect() throws SpRuntimeException {
     super.initClient();
-    var databaseName = connectionSettings.getDatabaseName();
-
-    // Checking whether the database exists
-    if (!influxClientProvider.databaseExists(influxDb, databaseName)) {
-      LOG.info("Database '" + databaseName + "' not found. Gets created ...");
-      influxClientProvider.createDatabase(influxDb, databaseName);
-    }
-
-    // setting up the database
-    influxDb.setDatabase(databaseName);
-    
influxDb.enableBatch(BatchOptions.DEFAULTS.actions(batchSize).flushDuration(flushDuration));
+    influxClientProvider.setupDatabaseAndBatching(
+        influxDb, connectionSettings.getDatabaseName(), batchSize, 
flushDuration);
   }
 
   /**
@@ -107,11 +94,4 @@ public class InfluxDbClient extends SharedInfluxClient {
 
     influxDb.write(p.build());
   }
-
-  /**
-   * Shuts down the connection to the InfluxDB server
-   */
-  void stop() {
-    influxDb.close();
-  }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
index 18f3d2fb54..3da9bc629c 100644
--- 
a/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-influx/src/main/java/org/apache/streampipes/extensions/connectors/influx/sink/InfluxDbSink.java
@@ -83,7 +83,7 @@ public class InfluxDbSink extends StreamPipesDataSink {
 
   @Override
   public void onDetach() throws SpRuntimeException {
-    influxDbClient.stop();
+    influxDbClient.disconnect();
   }
 
   public static String prepareString(String s) {

Reply via email to