tenthe commented on code in PR #2248: URL: https://github.com/apache/streampipes/pull/2248#discussion_r1410430477
########## streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.notifications.jvm.msteams; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.model.DataSinkType; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.pe.shared.PlaceholderExtractor; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.builder.DataSinkBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.wrapper.params.compat.SinkParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +public class MSTeamsSink extends StreamPipesDataSink { + + private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced"; + private static final String KEY_MESSAGE_ADVANCED_CONTENT = "messageContentAdvanced"; + private static final String KEY_MESSAGE_SIMPLE = "messageSimple"; + private static final String KEY_MESSAGE_SIMPLE_CONTENT = "messageContentSimple"; + private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType"; + private static final String KEY_WEBHOOK_URL = "webhookUrl"; + protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}"; + + private String messageContent; + private boolean isSimpleMessageMode; + private String webhookUrl; + + @Override + public void onEvent(Event event) { + + // This sink allows to use placeholders for event properties when defining the message content in the UI + // Therefore, we need to replace these placeholders based on the actual event before actually sending the message + var processedMessageContent = PlaceholderExtractor.replacePlaceholders(event, messageContent); + + String teamsMessageContent; + if (isSimpleMessageMode) { + teamsMessageContent = createMessageFromSimpleContent(processedMessageContent); + } else { + try { + teamsMessageContent = createMessageFromAdvancedContent(processedMessageContent); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "Advanced message content provided is not a valid JSON string: %s" + .formatted(messageContent) + ); + } + } + + try { Review Comment: Same as above ########## streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.notifications.jvm.msteams; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.model.DataSinkType; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.pe.shared.PlaceholderExtractor; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.builder.DataSinkBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.wrapper.params.compat.SinkParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +public class MSTeamsSink extends StreamPipesDataSink { + + private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced"; + private static final String KEY_MESSAGE_ADVANCED_CONTENT = "messageContentAdvanced"; + private static final String KEY_MESSAGE_SIMPLE = "messageSimple"; + private static final String KEY_MESSAGE_SIMPLE_CONTENT = "messageContentSimple"; + private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType"; + private static final String KEY_WEBHOOK_URL = "webhookUrl"; + protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}"; + + private String messageContent; + private boolean isSimpleMessageMode; + private String webhookUrl; + + @Override + public void onEvent(Event event) { + + // This sink allows to use placeholders for event properties when defining the message content in the UI + // Therefore, we need to replace these placeholders based on the actual event before actually sending the message + var processedMessageContent = PlaceholderExtractor.replacePlaceholders(event, messageContent); + + String teamsMessageContent; + if (isSimpleMessageMode) { + teamsMessageContent = createMessageFromSimpleContent(processedMessageContent); + } else { + try { Review Comment: I'd suggest to move the exception handling into the method to make the onEvent method easier to read. ########## streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.notifications.jvm.msteams; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.model.DataSinkType; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.pe.shared.PlaceholderExtractor; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.builder.DataSinkBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.wrapper.params.compat.SinkParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +public class MSTeamsSink extends StreamPipesDataSink { + + private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced"; + private static final String KEY_MESSAGE_ADVANCED_CONTENT = "messageContentAdvanced"; + private static final String KEY_MESSAGE_SIMPLE = "messageSimple"; + private static final String KEY_MESSAGE_SIMPLE_CONTENT = "messageContentSimple"; + private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType"; + private static final String KEY_WEBHOOK_URL = "webhookUrl"; + protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}"; + + private String messageContent; + private boolean isSimpleMessageMode; + private String webhookUrl; + + @Override + public void onEvent(Event event) { + + // This sink allows to use placeholders for event properties when defining the message content in the UI + // Therefore, we need to replace these placeholders based on the actual event before actually sending the message + var processedMessageContent = PlaceholderExtractor.replacePlaceholders(event, messageContent); + + String teamsMessageContent; + if (isSimpleMessageMode) { + teamsMessageContent = createMessageFromSimpleContent(processedMessageContent); + } else { + try { + teamsMessageContent = createMessageFromAdvancedContent(processedMessageContent); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "Advanced message content provided is not a valid JSON string: %s" + .formatted(messageContent) + ); + } + } + + try { + sendPayloadToWebhook(HttpClients.createDefault(), teamsMessageContent, webhookUrl); Review Comment: Why is the webhookUrl passed as a parameter? Can we use the local variable instead? ########## streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/msteams/MSTeamsSink.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.notifications.jvm.msteams; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.model.DataSinkType; +import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.pe.shared.PlaceholderExtractor; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.builder.DataSinkBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.wrapper.params.compat.SinkParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +public class MSTeamsSink extends StreamPipesDataSink { + + private static final String KEY_MESSAGE_ADVANCED = "messageAdvanced"; + private static final String KEY_MESSAGE_ADVANCED_CONTENT = "messageContentAdvanced"; + private static final String KEY_MESSAGE_SIMPLE = "messageSimple"; + private static final String KEY_MESSAGE_SIMPLE_CONTENT = "messageContentSimple"; + private static final String KEY_MESSAGE_TYPE_ALTERNATIVES = "messageType"; + private static final String KEY_WEBHOOK_URL = "webhookUrl"; + protected static final String SIMPLE_MESSAGE_TEMPLATE = "{\"text\": \"%s\"}"; + + private String messageContent; + private boolean isSimpleMessageMode; + private String webhookUrl; + + @Override + public void onEvent(Event event) { + + // This sink allows to use placeholders for event properties when defining the message content in the UI + // Therefore, we need to replace these placeholders based on the actual event before actually sending the message + var processedMessageContent = PlaceholderExtractor.replacePlaceholders(event, messageContent); + + String teamsMessageContent; + if (isSimpleMessageMode) { + teamsMessageContent = createMessageFromSimpleContent(processedMessageContent); + } else { + try { + teamsMessageContent = createMessageFromAdvancedContent(processedMessageContent); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "Advanced message content provided is not a valid JSON string: %s" + .formatted(messageContent) + ); + } + } + + try { + sendPayloadToWebhook(HttpClients.createDefault(), teamsMessageContent, webhookUrl); + } catch (IOException e) { + throw new RuntimeException("Sending notification to MS Teams failed.", e); + } + } + + @Override + public DataSinkDescription declareModel() { + return DataSinkBuilder.create("org.apache.streampipes.sinks.notifications.jvm.msteams") + .withLocales(Locales.EN) + .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .category(DataSinkType.NOTIFICATION) + .requiredStream( + StreamRequirementsBuilder + .create() + .requiredProperty(EpRequirements.anyProperty()) + .build() + ) + .requiredSecret(Labels.withId(KEY_WEBHOOK_URL)) + .requiredAlternatives( + Labels.withId(KEY_MESSAGE_TYPE_ALTERNATIVES), + Alternatives.from( + Labels.withId(KEY_MESSAGE_SIMPLE), + StaticProperties.stringFreeTextProperty( + Labels.withId(KEY_MESSAGE_SIMPLE_CONTENT), + true, + true + ), + true), + Alternatives.from( + Labels.withId(KEY_MESSAGE_ADVANCED), + StaticProperties.stringFreeTextProperty( + Labels.withId(KEY_MESSAGE_ADVANCED_CONTENT), + true, + true + ) + ) + ) + .build(); + } + + @Override + public void onInvocation( + SinkParams parameters, + EventSinkRuntimeContext runtimeContext + ) throws SpRuntimeException { + var extractor = parameters.extractor(); + webhookUrl = extractor.secretValue(KEY_WEBHOOK_URL); + + validateWebhookUrl(webhookUrl); + + var selectedAlternative = extractor.selectedAlternativeInternalId(KEY_MESSAGE_TYPE_ALTERNATIVES); + if (selectedAlternative.equals(KEY_MESSAGE_ADVANCED)) { + isSimpleMessageMode = false; + messageContent = extractor.singleValueParameter(KEY_MESSAGE_ADVANCED_CONTENT, String.class); + } else { + isSimpleMessageMode = true; + messageContent = extractor.singleValueParameter(KEY_MESSAGE_SIMPLE_CONTENT, String.class); + } + + } + + @Override + public void onDetach() { + // nothing to do + } + + /** + * Creates a JSON string intended for the MS Teams Webhook URL based on the provided plain message content. + * <p> + * This method utilizes a basic approach for constructing messages to be sent to MS Teams. + * If you intend to provide text in the form of Adaptive Cards, consider using + * {@link #createMessageFromAdvancedContent(String)} for a more advanced and interactive message format. + * </p> + * + * @param messageContent The plain message content to be included in the Teams message. + * @return A JSON string formatted using a predefined template with the provided message content. + */ + protected String createMessageFromSimpleContent(String messageContent) { + return SIMPLE_MESSAGE_TEMPLATE.formatted(messageContent); + } + + /** + * Creates a message for MS Teams from a JSON string, specifically designed for use with Adaptive Cards. + * <p> + * This method takes a JSON string as input, which is expected to represent the content of the message. + * The content is directly forwarded to MS Teams, allowing for the utilization of Adaptive Cards. + * Adaptive Cards provide a flexible and interactive way to present content in Microsoft Teams. + * Learn more about Adaptive Cards: <a href="https://learn.microsoft.com/en-us/adaptive-cards/">here</a> + * </p> + * + * @param messageContent The JSON string representing the content of the message. + * @return The original JSON string, unchanged. + * @throws JsonProcessingException If the provided message is not a valid JSON string. + */ + protected String createMessageFromAdvancedContent(String messageContent) throws JsonProcessingException { + new ObjectMapper().readValue(messageContent, Object.class); Review Comment: Maybe we can use a class variable for the object mapper instead of instantiating it on every event -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
