This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new ac7fae8a27 Speeding up compact adapter creation by replacing findAll
with getting the specific adapter (#3791)
ac7fae8a27 is described below
commit ac7fae8a2793cd9946b73112b2efa6b20f051f07
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Mon Sep 29 10:04:02 2025 +0200
Speeding up compact adapter creation by replacing findAll with getting the
specific adapter (#3791)
* minor fix for speeding up the creation of compact adapters
* added logs
* clean up
* mavn runs
---
.../management/AdapterMasterManagement.java | 41 +++++++++-------------
1 file changed, 16 insertions(+), 25 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 9b1002c527..c344a42527 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -17,7 +17,6 @@
*/
package org.apache.streampipes.connect.management.management;
-
import
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
@@ -41,7 +40,8 @@ import java.util.List;
import java.util.NoSuchElementException;
/**
- * This class is responsible for managing all the adapter instances which are
executed on worker nodes
+ * This class is responsible for managing all the adapter instances which are
+ * executed on worker nodes
*/
public class AdapterMasterManagement {
@@ -57,8 +57,7 @@ public class AdapterMasterManagement {
IAdapterStorage adapterInstanceStorage,
AdapterResourceManager adapterResourceManager,
DataStreamResourceManager dataStreamResourceManager,
- AdapterMetrics adapterMetrics
- ) {
+ AdapterMetrics adapterMetrics) {
this.adapterInstanceStorage = adapterInstanceStorage;
this.adapterMetrics = adapterMetrics;
this.adapterResourceManager = adapterResourceManager;
@@ -68,8 +67,7 @@ public class AdapterMasterManagement {
public void addAdapter(
AdapterDescription adapterDescription,
String adapterId,
- String principalSid
- )
+ String principalSid)
throws AdapterException {
// Create elementId for datastream
@@ -92,8 +90,7 @@ public class AdapterMasterManagement {
AdapterDescription adapterDescription,
String adapterId,
String streamId,
- String principalSid
- ) throws AdapterException {
+ String principalSid) throws AdapterException {
var storedDescription = new SourcesManagement()
.createAdapterDataStream(adapterDescription, streamId);
storedDescription.setCorrespondingAdapterId(adapterId);
@@ -102,21 +99,16 @@ public class AdapterMasterManagement {
}
public AdapterDescription getAdapter(String elementId) throws
AdapterException {
- List<AdapterDescription> allAdapters = adapterInstanceStorage.findAll();
-
- if (allAdapters != null && elementId != null) {
- for (AdapterDescription ad : allAdapters) {
- if (elementId.equals(ad.getElementId())) {
- return ad;
- }
- }
+ AdapterDescription adapter =
adapterInstanceStorage.getElementById(elementId);
+ if (adapter == null) {
+ throw new AdapterException("Adapter with ID " + elementId + " not
found");
}
-
- throw new AdapterException("Could not find adapter with id: " + elementId);
+ return adapter;
}
/**
- * First the adapter is stopped removed, then the corresponding data source
is deleted
+ * First the adapter is stopped removed, then the corresponding data source
is
+ * deleted
*
* @param elementId The elementId of the adapter instance
* @throws AdapterException when adapter can not be stopped
@@ -146,7 +138,7 @@ public class AdapterMasterManagement {
}
public void stopStreamAdapter(String elementId,
- boolean forceStop) throws AdapterException {
+ boolean forceStop) throws AdapterException {
AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
try {
@@ -181,8 +173,7 @@ public class AdapterMasterManagement {
ad.getAppId(),
SpServiceUrlProvider.ADAPTER,
ad.getDeploymentConfiguration()
- .getDesiredServiceTags()
- );
+ .getDesiredServiceTags());
// Update selected endpoint URL of adapter
ad.setSelectedEndpointUrl(baseUrl);
@@ -191,7 +182,8 @@ public class AdapterMasterManagement {
// Invoke adapter instance
WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
- // register the adapter at the metrics manager so that the
AdapterHealthCheck can send metrics
+ // register the adapter at the metrics manager so that the
AdapterHealthCheck
+ // can send metrics
adapterMetrics.register(ad.getElementId(), ad.getName());
LOG.info("Started adapter " + elementId + " on: " + baseUrl);
@@ -202,8 +194,7 @@ public class AdapterMasterManagement {
private void installDataSource(
SpDataStream stream,
- String principalSid
- ) throws AdapterException {
+ String principalSid) throws AdapterException {
try {
new DataStreamVerifier(stream).verifyAndAdd(principalSid, false);
} catch (SepaParseException e) {