This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7321515c30 fix(#4120): Add bearer token authentication options to
HttpStreamProtocol (#4130)
7321515c30 is described below
commit 7321515c301e63053d81fd8e97f685eb694a4a5a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jan 28 10:19:50 2026 +0100
fix(#4120): Add bearer token authentication options to HttpStreamProtocol
(#4130)
---
.../iiot/IIoTAdaptersExtensionModuleExport.java | 4 +-
.../migration/HttpStreamProtocolMigrationV1.java | 73 ++++++++++++++++++++
.../iiot/protocol/stream/HttpStreamProtocol.java | 79 +++++++++++++++++++---
.../documentation.md | 8 +++
.../strings.en | 7 ++
5 files changed, 162 insertions(+), 9 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
index 4fbdc74ead..cf1069e94a 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.iiot;
import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
import
org.apache.streampipes.connect.iiot.adapters.oi4.migration.Oi4AdapterMigrationV1;
import
org.apache.streampipes.connect.iiot.adapters.simulator.machine.MachineDataSimulatorAdapter;
+import
org.apache.streampipes.connect.iiot.migration.HttpStreamProtocolMigrationV1;
import
org.apache.streampipes.connect.iiot.migration.MachineDataSimulatorMigrationV1;
import org.apache.streampipes.connect.iiot.protocol.stream.FileReplayAdapter;
import org.apache.streampipes.connect.iiot.protocol.stream.HttpServerProtocol;
@@ -54,6 +55,7 @@ public class IIoTAdaptersExtensionModuleExport implements
IExtensionModuleExport
public List<IModelMigrator<?, ?>> migrators() {
return List.of(
new Oi4AdapterMigrationV1(),
- new MachineDataSimulatorMigrationV1());
+ new MachineDataSimulatorMigrationV1(),
+ new HttpStreamProtocolMigrationV1());
}
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
new file mode 100644
index 0000000000..0f5d76e102
--- /dev/null
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.migration;
+
+import org.apache.streampipes.connect.iiot.protocol.stream.HttpStreamProtocol;
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class HttpStreamProtocolMigrationV1 implements IAdapterMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ HttpStreamProtocol.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor
extractor) throws RuntimeException {
+ var headerCollection = makeHeaderCollection();
+ int insertIndex = Math.min(2, element.getConfig().size());
+ element.getConfig().add(insertIndex, headerCollection);
+ return MigrationResult.success(element);
+ }
+
+ private CollectionStaticProperty makeHeaderCollection() {
+ var headerKey = StaticProperties.stringFreeTextProperty(
+ Labels.withId(HttpStreamProtocol.HEADER_KEY)
+ );
+ headerKey.setOptional(true);
+ headerKey.setValue("");
+
+ var headerValue = StaticProperties.stringFreeTextProperty(
+ Labels.withId(HttpStreamProtocol.HEADER_VALUE)
+ );
+ headerValue.setOptional(true);
+ headerValue.setValue("");
+
+ return StaticProperties.collection(
+ Labels.withId(HttpStreamProtocol.HEADER_COLLECTION),
+ false,
+ headerKey,
+ headerValue
+ );
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
index 87d945428a..72fbfabbb3 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
@@ -34,7 +34,12 @@ import
org.apache.streampipes.extensions.management.connect.adapter.parser.Parse
import
org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.connect.guess.SampleData;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -44,6 +49,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
public class HttpStreamProtocol implements StreamPipesAdapter, IPullAdapter {
@@ -54,9 +61,12 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
private static final String URL_PROPERTY = "url";
private static final String INTERVAL_PROPERTY = "interval";
+ public static final String HEADER_COLLECTION = "header-collection";
+ public static final String HEADER_KEY = "header-key";
+ public static final String HEADER_VALUE = "header-value";
private String url;
- private String accessToken;
+ private List<HeaderConfiguration> headerConfigurations = new ArrayList<>();
private PollingSettings pollingSettings;
private PullAdapterScheduler pullAdapterScheduler;
@@ -71,9 +81,7 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
this.url = extractor.singleValueParameter(URL_PROPERTY, String.class);
int interval = extractor.singleValueParameter(INTERVAL_PROPERTY,
Integer.class);
this.pollingSettings = PollingSettings.from(TimeUnit.SECONDS, interval);
- // TODO change access token to an optional parameter
-// String accessToken =
extractor.singleValue(ACCESS_TOKEN_PROPERTY);
- this.accessToken = "";
+ this.headerConfigurations = getHeaderConfigurations(extractor);
}
private InputStream getDataFromEndpoint() throws ParseException {
@@ -83,8 +91,10 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
.connectTimeout(1000)
.socketTimeout(100000);
- if (this.accessToken != null && !this.accessToken.equals("")) {
- request.setHeader("Authorization", "Bearer " + this.accessToken);
+ for (HeaderConfiguration header : headerConfigurations) {
+ if (header.headerKey != null && !header.headerKey.isBlank()) {
+ request.addHeader(header.headerKey, header.headerValue == null ? ""
: header.headerValue);
+ }
}
var result = request
@@ -104,13 +114,33 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
@Override
public IAdapterConfiguration declareConfig() {
+ var headerKey = StaticProperties.stringFreeTextProperty(
+ Labels.withId(HEADER_KEY)
+ );
+ headerKey.setOptional(true);
+ headerKey.setValue("");
+
+ var headerValue = StaticProperties.stringFreeTextProperty(
+ Labels.withId(HEADER_VALUE)
+ );
+ headerValue.setOptional(true);
+ headerValue.setValue("");
+
return AdapterConfigurationBuilder
- .create(ID, 0, HttpStreamProtocol::new)
+ .create(ID, 1, HttpStreamProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.requiredTextParameter(Labels.withId(URL_PROPERTY))
.requiredIntegerParameter(Labels.withId(INTERVAL_PROPERTY))
+ .requiredStaticProperty(
+ StaticProperties.collection(
+ Labels.withId(HEADER_COLLECTION),
+ false,
+ headerKey,
+ headerValue
+ )
+ )
.buildConfiguration();
}
@@ -128,7 +158,7 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
- IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
+ IAdapterRuntimeContext adapterRuntimeContext) {
this.pullAdapterScheduler.shutdown();
}
@@ -153,4 +183,37 @@ public class HttpStreamProtocol implements
StreamPipesAdapter, IPullAdapter {
public PollingSettings getPollingInterval() {
return pollingSettings;
}
+
+ private List<HeaderConfiguration>
getHeaderConfigurations(IStaticPropertyExtractor extractor) {
+ List<HeaderConfiguration> headers = new ArrayList<>();
+ var collection = extractor.getStaticPropertyByName(HEADER_COLLECTION);
+ if (!(collection instanceof CollectionStaticProperty csp)) {
+ return headers;
+ }
+ if (csp.getMembers() == null) {
+ return headers;
+ }
+ for (StaticProperty member : csp.getMembers()) {
+ if (!(member instanceof StaticPropertyGroup group)) {
+ continue;
+ }
+ var memberExtractor =
StaticPropertyExtractor.from(group.getStaticProperties(), new ArrayList<>());
+ var headerKey = memberExtractor.textParameter(HEADER_KEY);
+ var headerValue = memberExtractor.textParameter(HEADER_VALUE);
+ if (headerKey != null && !headerKey.isBlank()) {
+ headers.add(new HeaderConfiguration(headerKey, headerValue));
+ }
+ }
+ return headers;
+ }
+
+ private static class HeaderConfiguration {
+ private final String headerKey;
+ private final String headerValue;
+
+ private HeaderConfiguration(String headerKey, String headerValue) {
+ this.headerKey = headerKey;
+ this.headerValue = headerValue;
+ }
+ }
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
index aeb5205988..46a75ef9e4 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
@@ -30,3 +30,11 @@ Continuously fetched events from an HTTP REST endpoint.
***
+## Configuration
+
+- URL: The HTTP endpoint to poll.
+- Interval: Polling interval in seconds.
+- Request Headers: Optional list of custom headers to include in every
request. Each entry contains a Header Key and
+ Header Value (e.g., `Authorization` / `Bearer <token>`).
+
+***
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
index ac361ccaa3..10ab3b5027 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
@@ -25,4 +25,11 @@ url.description=Example: http(s)://test-server.com
interval.title=Interval [sec]
interval.description=Example: 5 (Polling interval in seconds)
+header-collection.title=Request Headers
+header-collection.description=Optional custom headers to include with the
request.
+header-key.title=Header Key
+header-key.description=Example: Authorization
+
+header-value.title=Header Value
+header-value.description=Example: Bearer <token>