This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7321515c30 fix(#4120): Add bearer token authentication options to 
HttpStreamProtocol (#4130)
7321515c30 is described below

commit 7321515c301e63053d81fd8e97f685eb694a4a5a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jan 28 10:19:50 2026 +0100

    fix(#4120): Add bearer token authentication options to HttpStreamProtocol 
(#4130)
---
 .../iiot/IIoTAdaptersExtensionModuleExport.java    |  4 +-
 .../migration/HttpStreamProtocolMigrationV1.java   | 73 ++++++++++++++++++++
 .../iiot/protocol/stream/HttpStreamProtocol.java   | 79 +++++++++++++++++++---
 .../documentation.md                               |  8 +++
 .../strings.en                                     |  7 ++
 5 files changed, 162 insertions(+), 9 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
index 4fbdc74ead..cf1069e94a 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.iiot;
 import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
 import 
org.apache.streampipes.connect.iiot.adapters.oi4.migration.Oi4AdapterMigrationV1;
 import 
org.apache.streampipes.connect.iiot.adapters.simulator.machine.MachineDataSimulatorAdapter;
+import 
org.apache.streampipes.connect.iiot.migration.HttpStreamProtocolMigrationV1;
 import 
org.apache.streampipes.connect.iiot.migration.MachineDataSimulatorMigrationV1;
 import org.apache.streampipes.connect.iiot.protocol.stream.FileReplayAdapter;
 import org.apache.streampipes.connect.iiot.protocol.stream.HttpServerProtocol;
@@ -54,6 +55,7 @@ public class IIoTAdaptersExtensionModuleExport implements 
IExtensionModuleExport
   public List<IModelMigrator<?, ?>> migrators() {
     return List.of(
         new Oi4AdapterMigrationV1(),
-        new MachineDataSimulatorMigrationV1());
+        new MachineDataSimulatorMigrationV1(),
+        new HttpStreamProtocolMigrationV1());
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
new file mode 100644
index 0000000000..0f5d76e102
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/migration/HttpStreamProtocolMigrationV1.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.migration;
+
+import org.apache.streampipes.connect.iiot.protocol.stream.HttpStreamProtocol;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class HttpStreamProtocolMigrationV1 implements IAdapterMigrator {
+
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        HttpStreamProtocol.ID,
+        SpServiceTagPrefix.ADAPTER,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+                                                     IStaticPropertyExtractor 
extractor) throws RuntimeException {
+    var headerCollection = makeHeaderCollection();
+    int insertIndex = Math.min(2, element.getConfig().size());
+    element.getConfig().add(insertIndex, headerCollection);
+    return MigrationResult.success(element);
+  }
+
+  private CollectionStaticProperty makeHeaderCollection() {
+    var headerKey = StaticProperties.stringFreeTextProperty(
+        Labels.withId(HttpStreamProtocol.HEADER_KEY)
+    );
+    headerKey.setOptional(true);
+    headerKey.setValue("");
+
+    var headerValue = StaticProperties.stringFreeTextProperty(
+        Labels.withId(HttpStreamProtocol.HEADER_VALUE)
+    );
+    headerValue.setOptional(true);
+    headerValue.setValue("");
+
+    return StaticProperties.collection(
+        Labels.withId(HttpStreamProtocol.HEADER_COLLECTION),
+        false,
+        headerKey,
+        headerValue
+    );
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
index 87d945428a..72fbfabbb3 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
@@ -34,7 +34,12 @@ import 
org.apache.streampipes.extensions.management.connect.adapter.parser.Parse
 import 
org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
 import org.apache.streampipes.model.connect.guess.SampleData;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
+import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 
@@ -44,6 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public class HttpStreamProtocol implements StreamPipesAdapter, IPullAdapter {
@@ -54,9 +61,12 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
 
   private static final String URL_PROPERTY = "url";
   private static final String INTERVAL_PROPERTY = "interval";
+  public static final String HEADER_COLLECTION = "header-collection";
+  public static final String HEADER_KEY = "header-key";
+  public static final String HEADER_VALUE = "header-value";
 
   private String url;
-  private String accessToken;
+  private List<HeaderConfiguration> headerConfigurations = new ArrayList<>();
 
   private PollingSettings pollingSettings;
   private PullAdapterScheduler pullAdapterScheduler;
@@ -71,9 +81,7 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
     this.url = extractor.singleValueParameter(URL_PROPERTY, String.class);
     int interval = extractor.singleValueParameter(INTERVAL_PROPERTY, 
Integer.class);
     this.pollingSettings = PollingSettings.from(TimeUnit.SECONDS, interval);
-    // TODO change access token to an optional parameter
-//            String accessToken = 
extractor.singleValue(ACCESS_TOKEN_PROPERTY);
-    this.accessToken = "";
+    this.headerConfigurations = getHeaderConfigurations(extractor);
   }
 
   private InputStream getDataFromEndpoint() throws ParseException {
@@ -83,8 +91,10 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
           .connectTimeout(1000)
           .socketTimeout(100000);
 
-      if (this.accessToken != null && !this.accessToken.equals("")) {
-        request.setHeader("Authorization", "Bearer " + this.accessToken);
+      for (HeaderConfiguration header : headerConfigurations) {
+        if (header.headerKey != null && !header.headerKey.isBlank()) {
+          request.addHeader(header.headerKey, header.headerValue == null ? "" 
: header.headerValue);
+        }
       }
 
       var result = request
@@ -104,13 +114,33 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
 
   @Override
   public IAdapterConfiguration declareConfig() {
+    var headerKey = StaticProperties.stringFreeTextProperty(
+        Labels.withId(HEADER_KEY)
+    );
+    headerKey.setOptional(true);
+    headerKey.setValue("");
+
+    var headerValue = StaticProperties.stringFreeTextProperty(
+        Labels.withId(HEADER_VALUE)
+    );
+    headerValue.setOptional(true);
+    headerValue.setValue("");
+
     return AdapterConfigurationBuilder
-        .create(ID, 0, HttpStreamProtocol::new)
+        .create(ID, 1, HttpStreamProtocol::new)
         .withSupportedParsers(Parsers.defaultParsers())
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .requiredTextParameter(Labels.withId(URL_PROPERTY))
         .requiredIntegerParameter(Labels.withId(INTERVAL_PROPERTY))
+        .requiredStaticProperty(
+            StaticProperties.collection(
+                Labels.withId(HEADER_COLLECTION),
+                false,
+                headerKey,
+                headerValue
+            )
+        )
         .buildConfiguration();
   }
 
@@ -128,7 +158,7 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
 
   @Override
   public void onAdapterStopped(IAdapterParameterExtractor extractor,
-                               IAdapterRuntimeContext adapterRuntimeContext) 
throws AdapterException {
+                               IAdapterRuntimeContext adapterRuntimeContext) {
     this.pullAdapterScheduler.shutdown();
   }
 
@@ -153,4 +183,37 @@ public class HttpStreamProtocol implements 
StreamPipesAdapter, IPullAdapter {
   public PollingSettings getPollingInterval() {
     return pollingSettings;
   }
+
+  private List<HeaderConfiguration> 
getHeaderConfigurations(IStaticPropertyExtractor extractor) {
+    List<HeaderConfiguration> headers = new ArrayList<>();
+    var collection = extractor.getStaticPropertyByName(HEADER_COLLECTION);
+    if (!(collection instanceof CollectionStaticProperty csp)) {
+      return headers;
+    }
+    if (csp.getMembers() == null) {
+      return headers;
+    }
+    for (StaticProperty member : csp.getMembers()) {
+      if (!(member instanceof StaticPropertyGroup group)) {
+        continue;
+      }
+      var memberExtractor = 
StaticPropertyExtractor.from(group.getStaticProperties(), new ArrayList<>());
+      var headerKey = memberExtractor.textParameter(HEADER_KEY);
+      var headerValue = memberExtractor.textParameter(HEADER_VALUE);
+      if (headerKey != null && !headerKey.isBlank()) {
+        headers.add(new HeaderConfiguration(headerKey, headerValue));
+      }
+    }
+    return headers;
+  }
+
+  private static class HeaderConfiguration {
+    private final String headerKey;
+    private final String headerValue;
+
+    private HeaderConfiguration(String headerKey, String headerValue) {
+      this.headerKey = headerKey;
+      this.headerValue = headerValue;
+    }
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
index aeb5205988..46a75ef9e4 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/documentation.md
@@ -30,3 +30,11 @@ Continuously fetched events from an HTTP REST endpoint.
 
 ***
 
+## Configuration
+
+- URL: The HTTP endpoint to poll.
+- Interval: Polling interval in seconds.
+- Request Headers: Optional list of custom headers to include in every 
request. Each entry contains a Header Key and
+  Header Value (e.g., `Authorization` / `Bearer <token>`).
+
+***
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
index ac361ccaa3..10ab3b5027 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.http/strings.en
@@ -25,4 +25,11 @@ url.description=Example: http(s)://test-server.com
 interval.title=Interval [sec]
 interval.description=Example: 5 (Polling interval in seconds)
 
+header-collection.title=Request Headers
+header-collection.description=Optional custom headers to include with the 
request.
 
+header-key.title=Header Key
+header-key.description=Example: Authorization
+
+header-value.title=Header Value
+header-value.description=Example: Bearer <token>

Reply via email to