This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 2914-feat-support-http-proxy-in-ms-teams-sink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/2914-feat-support-http-proxy-in-ms-teams-sink by this push:
new 47c1d68e40 feat(#2914): Add proxy configuration to Teams sink
47c1d68e40 is described below
commit 47c1d68e403aa14c2b38837c17e0af1bd0e356a6
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Jun 3 15:33:32 2024 +0200
feat(#2914): Add proxy configuration to Teams sink
---
.../management/locales/LabelGenerator.java | 83 ++++++++++++++--------
.../jvm/NotificationsExtensionModuleExport.java | 8 ++-
.../jvm/migrations/MsTeamsSinkMigrationV1.java | 65 +++++++++++++++++
.../notifications/jvm/msteams/MSTeamsSink.java | 40 +++++++++--
.../strings.en | 7 ++
.../model/graph/DataProcessorInvocation.java | 2 +
.../model/graph/DataSinkInvocation.java | 2 +
7 files changed, 168 insertions(+), 39 deletions(-)
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
index 56a151a13b..7eb83a99bc 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/locales/LabelGenerator.java
@@ -20,11 +20,14 @@ package
org.apache.streampipes.extensions.management.locales;
import org.apache.streampipes.extensions.api.assets.AssetResolver;
import org.apache.streampipes.extensions.api.assets.DefaultAssetResolver;
import org.apache.streampipes.model.base.ConsumableStreamPipesEntity;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.output.AppendOutputStrategy;
import org.apache.streampipes.model.output.FixedOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
@@ -32,6 +35,7 @@ import
org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import java.io.IOException;
+import java.util.List;
import java.util.Properties;
public class LabelGenerator<T extends NamedStreamPipesEntity> {
@@ -84,40 +88,51 @@ public class LabelGenerator<T extends
NamedStreamPipesEntity> {
});
}
- if (isDataProcessor()) {
- ((DataProcessorDescription) desc).getOutputStrategies()
- .forEach(os -> {
- if (os instanceof AppendOutputStrategy) {
- ((AppendOutputStrategy) os).getEventProperties()
- .forEach(ep -> {
- ep.setLabel(getTitle(
- props,
- ep.getRuntimeId()
- ));
- ep.setDescription(getDescription(
- props,
- ep.getRuntimeId()
- ));
- });
- } else if (os instanceof FixedOutputStrategy) {
- ((FixedOutputStrategy) os).getEventProperties()
- .forEach(ep -> {
- ep.setLabel(getTitle(
- props,
- ep.getRuntimeId()
- ));
- ep.setDescription(getDescription(
- props,
- ep.getRuntimeId()
- ));
- });
- }
- });
+ if (isInvocable()) {
+ ((InvocableStreamPipesEntity) desc).getStaticProperties().forEach(sp
-> generateLabels(props, sp));
+ if (isDataProcessorInvocation()) {
+ applyOutputStrategies(((DataProcessorInvocation)
desc).getOutputStrategies(), props);
+ }
+ }
+
+ if (isDataProcessorDescription()) {
+ applyOutputStrategies(((DataProcessorDescription)
desc).getOutputStrategies(), props);
}
}
return desc;
}
+ private void applyOutputStrategies(List<OutputStrategy> outputStrategies,
+ Properties props) {
+ outputStrategies.forEach(os -> {
+ if (os instanceof AppendOutputStrategy) {
+ ((AppendOutputStrategy) os).getEventProperties()
+ .forEach(ep -> {
+ ep.setLabel(getTitle(
+ props,
+ ep.getRuntimeId()
+ ));
+ ep.setDescription(getDescription(
+ props,
+ ep.getRuntimeId()
+ ));
+ });
+ } else if (os instanceof FixedOutputStrategy) {
+ ((FixedOutputStrategy) os).getEventProperties()
+ .forEach(ep -> {
+ ep.setLabel(getTitle(
+ props,
+ ep.getRuntimeId()
+ ));
+ ep.setDescription(getDescription(
+ props,
+ ep.getRuntimeId()
+ ));
+ });
+ }
+ });
+ }
+
/**
* Returns the tile of the element description based on the data of the
resource files
@@ -185,10 +200,18 @@ public class LabelGenerator<T extends
NamedStreamPipesEntity> {
return desc instanceof ConsumableStreamPipesEntity;
}
- private boolean isDataProcessor() {
+ private boolean isDataProcessorDescription() {
return desc instanceof DataProcessorDescription;
}
+ private boolean isDataProcessorInvocation() {
+ return desc instanceof DataProcessorInvocation;
+ }
+
+ private boolean isInvocable() {
+ return desc instanceof InvocableStreamPipesEntity;
+ }
+
private boolean isAdapter() {
return desc instanceof AdapterDescription;
}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
index 87cac004f7..e75a73f12a 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/NotificationsExtensionModuleExport.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.sinks.notifications.jvm.email.EmailSink;
+import
org.apache.streampipes.sinks.notifications.jvm.migrations.MsTeamsSinkMigrationV1;
import
org.apache.streampipes.sinks.notifications.jvm.migrations.OneSignalSinkMigrationV1;
import
org.apache.streampipes.sinks.notifications.jvm.migrations.SlackNotificationSinkMigrationV1;
import
org.apache.streampipes.sinks.notifications.jvm.migrations.TelegramSinkMigrationV1;
@@ -54,9 +55,10 @@ public class NotificationsExtensionModuleExport implements
IExtensionModuleExpor
@Override
public List<IModelMigrator<?, ?>> migrators() {
return List.of(
- new OneSignalSinkMigrationV1(),
- new SlackNotificationSinkMigrationV1(),
- new TelegramSinkMigrationV1()
+ new OneSignalSinkMigrationV1(),
+ new SlackNotificationSinkMigrationV1(),
+ new TelegramSinkMigrationV1(),
+ new MsTeamsSinkMigrationV1()
);
}
}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
new file mode 100644
index 0000000000..5f1c0c8d8b
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/MsTeamsSinkMigrationV1.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.notifications.jvm.migrations;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink;
+
+import static
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_ALTERNATIVES;
+import static
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_DISABLED;
+import static
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_ENABLED;
+import static
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_GROUP;
+import static
org.apache.streampipes.sinks.notifications.jvm.msteams.MSTeamsSink.KEY_PROXY_URL;
+
+public class MsTeamsSinkMigrationV1 implements IDataSinkMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ MSTeamsSink.ID,
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+
IDataSinkParameterExtractor extractor) throws RuntimeException {
+ var proxyConfiguration = StaticProperties.alternatives(
+ Labels.withId(KEY_PROXY_ALTERNATIVES),
+ Alternatives.from(Labels.withId(KEY_PROXY_DISABLED)),
+ Alternatives.from(Labels.withId(KEY_PROXY_ENABLED),
+ StaticProperties.group(Labels.withId(KEY_PROXY_GROUP),
+
StaticProperties.stringFreeTextProperty(Labels.withId(KEY_PROXY_URL))
+ )
+ ));
+ proxyConfiguration.getAlternatives().get(0).setSelected(true);
+ element.getStaticProperties().add(1, proxyConfiguration);
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
index 80c55bb74c..555a2c5d71 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java
@@ -21,6 +21,7 @@ package
org.apache.streampipes.sinks.notifications.jvm.msteams;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.sdk.StaticProperties;
@@ -30,18 +31,19 @@ import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesNotificationSink;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -49,18 +51,26 @@ import java.net.URL;
public class MSTeamsSink extends StreamPipesNotificationSink {
+ public static final String ID =
"org.apache.streampipes.sinks.notifications.jvm.msteams";
+
private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced";
private static final String KEY_MESSAGE_ADVANCED_CONTENT =
"messageContentAdvanced";
private static final String KEY_MESSAGE_SIMPLE = "messageSimple";
private static final String KEY_MESSAGE_SIMPLE_CONTENT =
"messageContentSimple";
private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType";
private static final String KEY_WEBHOOK_URL = "webhookUrl";
+ public static final String KEY_PROXY_ALTERNATIVES = "proxy";
+ public static final String KEY_PROXY_DISABLED = "proxyDisabled";
+ public static final String KEY_PROXY_ENABLED = "proxyEnabled";
+ public static final String KEY_PROXY_GROUP = "proxyConfigurationGroup";
+ public static final String KEY_PROXY_URL = "proxyUrl";
protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}";
private String messageContent;
private boolean isSimpleMessageMode;
private String webhookUrl;
private ObjectMapper objectMapper;
+ private HttpClient httpClient;
public MSTeamsSink() {
super();
@@ -89,6 +99,17 @@ public class MSTeamsSink extends StreamPipesNotificationSink
{
isSimpleMessageMode = true;
messageContent =
extractor.singleValueParameter(KEY_MESSAGE_SIMPLE_CONTENT, String.class);
}
+
+ var selectedProxyAlternative =
extractor.selectedAlternativeInternalId(KEY_PROXY_ALTERNATIVES);
+ if (selectedProxyAlternative.equals(KEY_PROXY_DISABLED)) {
+ this.httpClient = HttpClients.createDefault();
+ } else {
+ var proxyUrl = extractor.singleValueParameter(KEY_PROXY_URL,
String.class);
+ this.httpClient = HttpClientBuilder
+ .create()
+ .setProxy(HttpHost.create(proxyUrl))
+ .build();
+ }
}
@Override
@@ -104,16 +125,15 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
} else {
teamsMessageContent =
createMessageFromAdvancedContent(processedMessageContent);
}
- var client = HttpClientBuilder.create().useSystemProperties().build();
- sendPayloadToWebhook(client, teamsMessageContent, webhookUrl);
+ sendPayloadToWebhook(httpClient, teamsMessageContent, webhookUrl);
}
@Override
public DataSinkBuilder declareModelWithoutSilentPeriod() {
return DataSinkBuilder
- .create("org.apache.streampipes.sinks.notifications.jvm.msteams", 0)
+ .create(ID, 1)
.withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.category(DataSinkType.NOTIFICATION)
.requiredStream(
StreamRequirementsBuilder
@@ -122,6 +142,14 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
.build()
)
.requiredSecret(Labels.withId(KEY_WEBHOOK_URL))
+ .requiredAlternatives(
+ Labels.withId(KEY_PROXY_ALTERNATIVES),
+ Alternatives.from(Labels.withId(KEY_PROXY_DISABLED)),
+ Alternatives.from(Labels.withId(KEY_PROXY_ENABLED),
+ StaticProperties.group(Labels.withId(KEY_PROXY_GROUP),
+
StaticProperties.stringFreeTextProperty(Labels.withId(KEY_PROXY_URL))
+ )
+ ))
.requiredAlternatives(
Labels.withId(KEY_MESSAGE_TYPE_ALTERNATIVES),
Alternatives.from(
@@ -208,7 +236,7 @@ public class MSTeamsSink extends
StreamPipesNotificationSink {
var result = httpClient.execute(postRequest);
if (result.getStatusLine()
- .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
+ .getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
throw new SpRuntimeException(
"The provided message payload was not accepted by the MS Teams
API: %s"
.formatted(payload)
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
index 1457a3c77f..bb7ebce47f 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.msteams/strings.en
@@ -33,3 +33,10 @@ messageAdvanced.title=Advanced Message Format
webhookUrl.title=Webhook URL
webhookUrl.description=URL of the Webhook that allows to notifications to a
dedicated channel in MS Teams.
+
+proxy.title=Proxy Settings
+proxy.description=Additional proxy server settings
+proxyDisabled.title=No proxy
+proxyEnabled.title=Use Proxy
+proxyUrl.title=URL
+proxyUrl.description=The URL of the proxy, e.g., http://proxy.com:80
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
index 64aed3b49b..bd6876755a 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataProcessorInvocation.java
@@ -56,7 +56,9 @@ public class DataProcessorInvocation extends
InvocableStreamPipesEntity implemen
this.setStreamRequirements(other.getSpDataStreams());
this.setAppId(other.getAppId());
this.setIncludesAssets(other.isIncludesAssets());
+ this.setIncludesLocales(other.isIncludesLocales());
this.setIncludedAssets(other.getIncludedAssets());
+ this.setIncludedLocales(other.getIncludedLocales());
this.setElementId(ElementIdGenerator.makeElementId(this));
this.serviceTagPrefix = SpServiceTagPrefix.DATA_PROCESSOR;
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
index 4476c6651b..652180a6ed 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/graph/DataSinkInvocation.java
@@ -51,8 +51,10 @@ public class DataSinkInvocation extends
InvocableStreamPipesEntity {
this.setStreamRequirements(other.getSpDataStreams());
this.setAppId(other.getAppId());
this.setIncludesAssets(other.isIncludesAssets());
+ this.setIncludesLocales(other.isIncludesLocales());
this.setElementId(ElementIdGenerator.makeElementId(this));
this.setIncludedAssets(other.getIncludedAssets());
+ this.setIncludedLocales(other.getIncludedLocales());
this.serviceTagPrefix = SpServiceTagPrefix.DATA_SINK;
}