This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 2914-feat-support-http-proxy-in-ms-teams-sink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/2914-feat-support-http-proxy-in-ms-teams-sink by this push:
     new 47c1d68e40 feat(#2914): Add proxy configuration to Teams sink
47c1d68e40 is described below

commit 47c1d68e403aa14c2b38837c17e0af1bd0e356a6
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Jun 3 15:33:32 2024 +0200

    feat(#2914): Add proxy configuration to Teams sink
---
 .../management/locales/LabelGenerator.java         | 83 ++++++++++++++--------
 .../jvm/NotificationsExtensionModuleExport.java    |  8 ++-
 .../jvm/migrations/MsTeamsSinkMigrationV1.java     | 65 +++++++++++++++++
 .../notifications/jvm/msteams/MSTeamsSink.java     | 40 +++++++++--
 .../strings.en                                     |  7 ++
 .../model/graph/DataProcessorInvocation.java       |  2 +
 .../model/graph/DataSinkInvocation.java            |  2 +
 7 files changed, 168 insertions(+), 39 deletions(-)

diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
index 56a151a13b..7eb83a99bc 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
@@ -20,11 +20,14 @@ package 
org.apache.streampipes.extensions.management.locales;
 import org.apache.streampipes.extensions.api.assets.AssetResolver;
 import org.apache.streampipes.extensions.api.assets.DefaultAssetResolver;
 import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.output.AppendOutputStrategy;
 import org.apache.streampipes.model.output.FixedOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategy;
 import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
@@ -32,6 +35,7 @@ import 
org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Properties;
 
 public class LabelGenerator<T extends NamedStreamPipesEntity> {
@@ -84,40 +88,51 @@ public class LabelGenerator<T extends 
NamedStreamPipesEntity> {
             });
       }
 
-      if (isDataProcessor()) {
-        ((DataProcessorDescription) desc).getOutputStrategies()
-            .forEach(os -> {
-              if (os instanceof AppendOutputStrategy) {
-                ((AppendOutputStrategy) os).getEventProperties()
-                    .forEach(ep -> {
-                      ep.setLabel(getTitle(
-                          props,
-                          ep.getRuntimeId()
-                      ));
-                      ep.setDescription(getDescription(
-                          props,
-                          ep.getRuntimeId()
-                      ));
-                    });
-              } else if (os instanceof FixedOutputStrategy) {
-                ((FixedOutputStrategy) os).getEventProperties()
-                    .forEach(ep -> {
-                      ep.setLabel(getTitle(
-                          props,
-                          ep.getRuntimeId()
-                      ));
-                      ep.setDescription(getDescription(
-                          props,
-                          ep.getRuntimeId()
-                      ));
-                    });
-              }
-            });
+      if (isInvocable()) {
+        ((InvocableStreamPipesEntity) desc).getStaticProperties().forEach(sp 
-> generateLabels(props, sp));
+        if (isDataProcessorInvocation()) {
+          applyOutputStrategies(((DataProcessorInvocation) 
desc).getOutputStrategies(), props);
+        }
+      }
+
+      if (isDataProcessorDescription()) {
+        applyOutputStrategies(((DataProcessorDescription) 
desc).getOutputStrategies(), props);
       }
     }
     return desc;
   }
 
+  private void applyOutputStrategies(List<OutputStrategy> outputStrategies,
+                                     Properties props) {
+    outputStrategies.forEach(os -> {
+      if (os instanceof AppendOutputStrategy) {
+        ((AppendOutputStrategy) os).getEventProperties()
+            .forEach(ep -> {
+              ep.setLabel(getTitle(
+                  props,
+                  ep.getRuntimeId()
+              ));
+              ep.setDescription(getDescription(
+                  props,
+                  ep.getRuntimeId()
+              ));
+            });
+      } else if (os instanceof FixedOutputStrategy) {
+        ((FixedOutputStrategy) os).getEventProperties()
+            .forEach(ep -> {
+              ep.setLabel(getTitle(
+                  props,
+                  ep.getRuntimeId()
+              ));
+              ep.setDescription(getDescription(
+                  props,
+                  ep.getRuntimeId()
+              ));
+            });
+      }
+    });
+  }
+
 
   /**
    * Returns the tile of the element description based on the data of the 
resource files
@@ -185,10 +200,18 @@ public class LabelGenerator<T extends 
NamedStreamPipesEntity> {
     return desc instanceof ConsumableStreamPipesEntity;
   }
 
-  private boolean isDataProcessor() {
+  private boolean isDataProcessorDescription() {
     return desc instanceof DataProcessorDescription;
   }
 
+  private boolean isDataProcessorInvocation() {
+    return desc instanceof DataProcessorInvocation;
+  }
+
+  private boolean isInvocable() {
+    return desc instanceof InvocableStreamPipesEntity;
+  }
+
   private boolean isAdapter() {
     return desc instanceof AdapterDescription;
   }
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
index 87cac004f7..e75a73f12a 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.sinks.notifications.jvm.email.EmailSink;
+import 
org.apache.streampipes.sinks.notifications.jvm.migrations.MsTeamsSinkMigrationV1;
 import 
org.apache.streampipes.sinks.notifications.jvm.migrations.OneSignalSinkMigrationV1;
 import 
org.apache.streampipes.sinks.notifications.jvm.migrations.SlackNotificationSinkMigrationV1;
 import 
org.apache.streampipes.sinks.notifications.jvm.migrations.TelegramSinkMigrationV1;
@@ -54,9 +55,10 @@ public class NotificationsExtensionModuleExport implements 
IExtensionModuleExpor
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
     return List.of(
-            new OneSignalSinkMigrationV1(),
-            new SlackNotificationSinkMigrationV1(),
-            new TelegramSinkMigrationV1()
+        new OneSignalSinkMigrationV1(),
+        new SlackNotificationSinkMigrationV1(),
+        new TelegramSinkMigrationV1(),
+        new MsTeamsSinkMigrationV1()
     );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
new file mode 100644
index 0000000000..5f1c0c8d8b
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.notifications.jvm.migrations;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink;
+
+import static 
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_ALTERNATIVES;
+import static 
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_DISABLED;
+import static 
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_ENABLED;
+import static 
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_GROUP;
+import static 
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_URL;
+
+public class MsTeamsSinkMigrationV1 implements IDataSinkMigrator {
+
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        MSTeamsSink.ID,
+        SpServiceTagPrefix.DATA_SINK,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+    var proxyConfiguration = StaticProperties.alternatives(
+        Labels.withId(KEY_PROXY_ALTERNATIVES),
+        Alternatives.from(Labels.withId(KEY_PROXY_DISABLED)),
+        Alternatives.from(Labels.withId(KEY_PROXY_ENABLED),
+            StaticProperties.group(Labels.withId(KEY_PROXY_GROUP),
+                
StaticProperties.stringFreeTextProperty(Labels.withId(KEY_PROXY_URL))
+            )
+        ));
+    proxyConfiguration.getAlternatives().get(0).setSelected(true);
+    element.getStaticProperties().add(1, proxyConfiguration);
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
index 80c55bb74c..555a2c5d71 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
@@ -21,6 +21,7 @@ package 
org.apache.streampipes.sinks.notifications.jvm.msteams;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.sdk.StaticProperties;
@@ -30,18 +31,19 @@ import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.wrapper.params.compat.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -49,18 +51,26 @@ import java.net.URL;
 
 public class MSTeamsSink extends StreamPipesNotificationSink {
 
+  public static final String ID = 
"org.apache.streampipes.sinks.notifications.jvm.msteams";
+
   private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced";
   private static final String KEY_MESSAGE_ADVANCED_CONTENT = 
"messageContentAdvanced";
   private static final String KEY_MESSAGE_SIMPLE = "messageSimple";
   private static final String KEY_MESSAGE_SIMPLE_CONTENT = 
"messageContentSimple";
   private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType";
   private static final String KEY_WEBHOOK_URL = "webhookUrl";
+  public static final String KEY_PROXY_ALTERNATIVES = "proxy";
+  public static final String KEY_PROXY_DISABLED = "proxyDisabled";
+  public static final String KEY_PROXY_ENABLED = "proxyEnabled";
+  public static final String KEY_PROXY_GROUP = "proxyConfigurationGroup";
+  public static final String KEY_PROXY_URL = "proxyUrl";
   protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}";
 
   private String messageContent;
   private boolean isSimpleMessageMode;
   private String webhookUrl;
   private ObjectMapper objectMapper;
+  private HttpClient httpClient;
 
   public MSTeamsSink() {
     super();
@@ -89,6 +99,17 @@ public class MSTeamsSink extends StreamPipesNotificationSink 
{
       isSimpleMessageMode = true;
       messageContent = 
extractor.singleValueParameter(KEY_MESSAGE_SIMPLE_CONTENT, String.class);
     }
+
+    var selectedProxyAlternative = 
extractor.selectedAlternativeInternalId(KEY_PROXY_ALTERNATIVES);
+    if (selectedProxyAlternative.equals(KEY_PROXY_DISABLED)) {
+      this.httpClient = HttpClients.createDefault();
+    } else {
+      var proxyUrl = extractor.singleValueParameter(KEY_PROXY_URL, 
String.class);
+      this.httpClient = HttpClientBuilder
+          .create()
+          .setProxy(HttpHost.create(proxyUrl))
+          .build();
+    }
   }
 
   @Override
@@ -104,16 +125,15 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
     } else {
       teamsMessageContent = 
createMessageFromAdvancedContent(processedMessageContent);
     }
-    var client = HttpClientBuilder.create().useSystemProperties().build();
-    sendPayloadToWebhook(client, teamsMessageContent, webhookUrl);
+    sendPayloadToWebhook(httpClient, teamsMessageContent, webhookUrl);
   }
 
   @Override
   public DataSinkBuilder declareModelWithoutSilentPeriod() {
     return DataSinkBuilder
-        .create("org.apache.streampipes.sinks.notifications.jvm.msteams", 0)
+        .create(ID, 1)
         .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .category(DataSinkType.NOTIFICATION)
         .requiredStream(
             StreamRequirementsBuilder
@@ -122,6 +142,14 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
                 .build()
         )
         .requiredSecret(Labels.withId(KEY_WEBHOOK_URL))
+        .requiredAlternatives(
+            Labels.withId(KEY_PROXY_ALTERNATIVES),
+            Alternatives.from(Labels.withId(KEY_PROXY_DISABLED)),
+            Alternatives.from(Labels.withId(KEY_PROXY_ENABLED),
+                StaticProperties.group(Labels.withId(KEY_PROXY_GROUP),
+                    
StaticProperties.stringFreeTextProperty(Labels.withId(KEY_PROXY_URL))
+                )
+            ))
         .requiredAlternatives(
             Labels.withId(KEY_MESSAGE_TYPE_ALTERNATIVES),
             Alternatives.from(
@@ -208,7 +236,7 @@ public class MSTeamsSink extends 
StreamPipesNotificationSink {
 
       var result = httpClient.execute(postRequest);
       if (result.getStatusLine()
-                .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
+          .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
         throw new SpRuntimeException(
             "The provided message payload was not accepted by the MS Teams 
API: %s"
                 .formatted(payload)
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
index 1457a3c77f..bb7ebce47f 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
@@ -33,3 +33,10 @@ messageAdvanced.title=Advanced Message Format
 
 webhookUrl.title=Webhook URL
 webhookUrl.description=URL of the Webhook that allows to notifications to a 
dedicated channel in MS Teams.
+
+proxy.title=Proxy Settings
+proxy.description=Additional proxy server settings
+proxyDisabled.title=No proxy
+proxyEnabled.title=Use Proxy
+proxyUrl.title=URL
+proxyUrl.description=The URL of the proxy, e.g., http://proxy.com:80
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 64aed3b49b..bd6876755a 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -56,7 +56,9 @@ public class DataProcessorInvocation extends 
InvocableStreamPipesEntity implemen
     this.setStreamRequirements(other.getSpDataStreams());
     this.setAppId(other.getAppId());
     this.setIncludesAssets(other.isIncludesAssets());
+    this.setIncludesLocales(other.isIncludesLocales());
     this.setIncludedAssets(other.getIncludedAssets());
+    this.setIncludedLocales(other.getIncludedLocales());
     this.setElementId(ElementIdGenerator.makeElementId(this));
     this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR;
   }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index 4476c6651b..652180a6ed 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -51,8 +51,10 @@ public class DataSinkInvocation extends 
InvocableStreamPipesEntity {
     this.setStreamRequirements(other.getSpDataStreams());
     this.setAppId(other.getAppId());
     this.setIncludesAssets(other.isIncludesAssets());
+    this.setIncludesLocales(other.isIncludesLocales());
     this.setElementId(ElementIdGenerator.makeElementId(this));
     this.setIncludedAssets(other.getIncludedAssets());
+    this.setIncludedLocales(other.getIncludedLocales());
     this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK;
   }
 

Reply via email to