This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 828b1b1cb4 feat: Add environment variable for extensions file storage
(#3819)
828b1b1cb4 is described below
commit 828b1b1cb4837d070fe23bab2cbf8649fa2e3254
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Oct 7 15:35:29 2025 +0200
feat: Add environment variable for extensions file storage (#3819)
---
.../apache/streampipes/commons/constants/Envs.java | 1 +
.../commons/environment/DefaultEnvironment.java | 5 +++++
.../commons/environment/Environment.java | 2 ++
.../connect/iiot/utils/FileProtocolUtils.java | 8 +++++++-
.../security/CompositeCertificateValidator.java | 10 +++++++++-
.../connectors/opcua/utils/OpcUaUtils.java | 21 +++++++++++++++++----
6 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7158b50974..bd06f7317a 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -23,6 +23,7 @@ public enum Envs {
SP_PORT("SP_PORT"),
SP_CORE_ASSET_BASE_DIR("SP_CORE_ASSET_BASE_DIR"),
+ SP_EXT_ASSET_BASE_DIR("SP_EXT_ASSET_BASE_DIR"),
SP_CORE_SCHEME("SP_CORE_SCHEME", "http", "http"),
SP_CORE_HOST("SP_CORE_HOST", "backend", "localhost"),
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index a9f35337d9..c8f5f0686c 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -233,6 +233,11 @@ public class DefaultEnvironment implements Environment {
return new StringEnvironmentVariable(Envs.SP_CORE_ASSET_BASE_DIR);
}
+ @Override
+ public StringEnvironmentVariable getExtAssetBaseDir() {
+ return new StringEnvironmentVariable(Envs.SP_EXT_ASSET_BASE_DIR);
+ }
+
@Override
public StringEnvironmentVariable getFlinkJarFileLoc() {
return new StringEnvironmentVariable(Envs.SP_FLINK_JAR_FILE_LOC);
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index e168cc23c4..07ed38c9df 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -98,6 +98,8 @@ public interface Environment {
StringEnvironmentVariable getCoreAssetBaseDir();
+ StringEnvironmentVariable getExtAssetBaseDir();
+
// Flink Wrapper
StringEnvironmentVariable getFlinkJarFileLoc();
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
index 6c7ece4c44..e0b5bbf872 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.connect.iiot.utils;
import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.connect.ParseException;
import org.apache.streampipes.commons.file.FileHasher;
import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
@@ -86,7 +87,12 @@ public class FileProtocolUtils {
}
private static String makeServiceStorageDir() {
- return System.getProperty("user.home")
+ var storageDir = Environments
+ .getEnvironment()
+ .getExtAssetBaseDir()
+ .getValueOrReturn(System.getProperty("user.home"));
+
+ return storageDir
+ File.separator
+ ".streampipes"
+ File.separator
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java
index c88392c729..f5c391ffa7 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java
@@ -71,6 +71,7 @@ public class CompositeCertificateValidator implements
ClientCertificateValidator
public void validateCertificateChain(List<X509Certificate> certificateChain)
throws UaException {
PKIXCertPathBuilderResult certPathResult;
+ X509Certificate peer = getEndEntity(certificateChain);
try {
certPathResult = CertificateValidationUtil.buildTrustedCertPath(
certificateChain,
@@ -79,7 +80,7 @@ public class CompositeCertificateValidator implements
ClientCertificateValidator
);
} catch (UaException e) {
if (isCertificateRejected(e.getStatusCode().getValue())) {
- sendToCore(certificateChain.get(0));
+ sendToCore(peer);
}
throw e;
}
@@ -97,6 +98,13 @@ public class CompositeCertificateValidator implements
ClientCertificateValidator
);
}
+ private X509Certificate getEndEntity(List<X509Certificate> chain) {
+ return chain.stream()
+ .filter(c -> c.getBasicConstraints() < 0)
+ .findFirst()
+ .orElse(chain.get(0));
+ }
+
@Override
public void validateCertificateChain(
List<X509Certificate> certificateChain,
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
index df09ed3050..5debd21414 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
@@ -34,6 +34,8 @@ import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.List;
@@ -45,6 +47,8 @@ import java.util.concurrent.ExecutionException;
*/
public class OpcUaUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(OpcUaUtils.class);
+
private static final String OPC_TCP_PREFIX = "opc.tcp://";
/***
@@ -141,19 +145,28 @@ public class OpcUaUtils {
Throwable cause = e.getCause();
if (cause instanceof UaException uaException) {
- return CompositeCertificateValidator.REJECTED_STATUS_CODES
- .contains(uaException.getStatusCode().getValue());
+ return checkAndLogCertificateException(uaException);
}
Throwable nestedCause = cause != null ? cause.getCause() : null;
if (nestedCause instanceof UaException uaException) {
- return CompositeCertificateValidator.REJECTED_STATUS_CODES
- .contains(uaException.getStatusCode().getValue());
+ return checkAndLogCertificateException(uaException);
}
return false;
}
+ private static boolean checkAndLogCertificateException(UaException e) {
+ var containsRejectedStatusCode =
CompositeCertificateValidator.REJECTED_STATUS_CODES
+ .contains(e.getStatusCode().getValue());
+
+ if (containsRejectedStatusCode) {
+ var statusCode =
CompositeCertificateValidator.REJECTED_STATUS_CODES.stream().filter(code ->
code.equals(e.getStatusCode().getValue())).findFirst();
+ statusCode.ifPresent(sc -> LOG.warn("Status Code: {}", sc));
+ }
+ return containsRejectedStatusCode;
+ }
+
private static String makeExceptionMessage(ExecutionException e) {
StringBuilder message = new StringBuilder(
"The provided certificate could not be trusted. Administrators can
accept this certificate in the settings. "