This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 161b5c84601c115f8297c7cc652da8a326aa50a9 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Thu Sep 15 15:36:54 2022 +0200 CAMEL-18506: replace async http client with Java's own HTTP client in camel-telegram --- .../apache/camel/catalog/components/telegram.json | 5 +- components/camel-telegram/pom.xml | 10 +- .../telegram/TelegramComponentConfigurer.java | 10 +- .../telegram/TelegramEndpointConfigurer.java | 9 +- .../telegram/TelegramEndpointUriFactory.java | 2 +- .../apache/camel/component/telegram/telegram.json | 7 +- .../component/telegram/TelegramComponent.java | 26 +- .../camel/component/telegram/TelegramEndpoint.java | 81 +--- .../telegram/service/OutgoingMessageHandler.java | 117 ++++++ .../telegram/service/TelegramAsyncHandler.java | 121 ++++++ .../telegram/service/TelegramBodyPublisher.java | 163 ++++++++ .../service/TelegramServiceRestBotAPIAdapter.java | 411 +++++++-------------- .../TelegramConsumerHealthCheckErrorTest.java | 2 +- .../telegram/TelegramWebhookRegistrationTest.java | 26 +- .../telegram/integration/TelegramServiceIT.java | 11 +- .../integration/TelegramServiceProxyIT.java | 6 + .../service/OutgoingMessageHandlerTest.java | 146 ++++++++ .../telegram/service/TelegramAsyncHandlerTest.java | 63 ++++ .../ROOT/pages/camel-3x-upgrade-guide-3_19.adoc | 8 + .../dsl/TelegramComponentBuilderFactory.java | 30 +- .../dsl/TelegramEndpointBuilderFactory.java | 207 ----------- 21 files changed, 817 insertions(+), 644 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/telegram.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/telegram.json index b7f172aaf57..ed7d8f4c8a2 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/telegram.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/telegram.json @@ -26,8 +26,7 @@ "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "baseUri": { "kind": "property", "displayName": "Base Uri", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "https:\/\/api.telegram.org", "description": "Can be used to set an alternative base URI, e.g. when you want to test the component against a mock Telegram API" }, - "client": { "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom AsyncHttpClient" }, - "clientConfig": { "kind": "property", "displayName": "Client Config", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClientConfig", "deprecated": false, "autowired": false, "secret": false, "description": "To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance." }, + "client": { "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.net.http.HttpClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom java.net.http.HttpClient" }, "authorizationToken": { "kind": "property", "displayName": "Authorization Token", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "The default Telegram authorization token to be used when the information is not provided in the endpoints." } }, "headers": { @@ -50,8 +49,6 @@ "chatId": { "kind": "parameter", "displayName": "Chat Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "The identifier of the chat that will receive the produced messages. Chat ids can be first obtained from incoming messages [...] "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may other [...] "baseUri": { "kind": "parameter", "displayName": "Base Uri", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "Can be used to set an alternative base URI, e.g. when you want to test the component against a mock Telegram API" }, - "bufferSize": { "kind": "parameter", "displayName": "Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 4096, "description": "The initial in-memory buffer size used when transferring data between Camel and AHC Client." }, - "clientConfig": { "kind": "parameter", "displayName": "Client Config", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClientConfig", "deprecated": false, "autowired": false, "secret": false, "description": "To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance." }, "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": "proxy", "label": "proxy", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy host which could be used when sending out the message." }, "proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "proxy", "label": "proxy", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy port which could be used when sending out the message." }, "proxyType": { "kind": "parameter", "displayName": "Proxy Type", "group": "proxy", "label": "proxy", "required": false, "type": "object", "javaType": "org.apache.camel.component.telegram.TelegramProxyType", "enum": [ "HTTP", "SOCKS4", "SOCKS5" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTP", "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy type which [...] diff --git a/components/camel-telegram/pom.xml b/components/camel-telegram/pom.xml index 8aeca0ea89b..d0ece822fee 100644 --- a/components/camel-telegram/pom.xml +++ b/components/camel-telegram/pom.xml @@ -43,14 +43,14 @@ </dependency> <dependency> - <groupId>org.asynchttpclient</groupId> - <artifactId>async-http-client</artifactId> - <version>${ahc-version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3-version}</version> </dependency> <dependency> diff --git a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramComponentConfigurer.java b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramComponentConfigurer.java index 8bc85a78005..36927aa1e55 100644 --- a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramComponentConfigurer.java +++ b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramComponentConfigurer.java @@ -29,9 +29,7 @@ public class TelegramComponentConfigurer extends PropertyConfigurerSupport imple case "baseUri": target.setBaseUri(property(camelContext, java.lang.String.class, value)); return true; case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; - case "client": target.setClient(property(camelContext, org.asynchttpclient.AsyncHttpClient.class, value)); return true; - case "clientconfig": - case "clientConfig": target.setClientConfig(property(camelContext, org.asynchttpclient.AsyncHttpClientConfig.class, value)); return true; + case "client": target.setClient(property(camelContext, java.net.http.HttpClient.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; default: return false; @@ -49,9 +47,7 @@ public class TelegramComponentConfigurer extends PropertyConfigurerSupport imple case "baseUri": return java.lang.String.class; case "bridgeerrorhandler": case "bridgeErrorHandler": return boolean.class; - case "client": return org.asynchttpclient.AsyncHttpClient.class; - case "clientconfig": - case "clientConfig": return org.asynchttpclient.AsyncHttpClientConfig.class; + case "client": return java.net.http.HttpClient.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; default: return null; @@ -71,8 +67,6 @@ public class TelegramComponentConfigurer extends PropertyConfigurerSupport imple case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); case "client": return target.getClient(); - case "clientconfig": - case "clientConfig": return target.getClientConfig(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); default: return null; diff --git a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointConfigurer.java b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointConfigurer.java index 03ee7a4eeff..99d044f693f 100644 --- a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointConfigurer.java +++ b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointConfigurer.java @@ -37,8 +37,7 @@ public class TelegramEndpointConfigurer extends PropertyConfigurerSupport implem case "bufferSize": target.setBufferSize(property(camelContext, int.class, value)); return true; case "chatid": case "chatId": target.getConfiguration().setChatId(property(camelContext, java.lang.String.class, value)); return true; - case "clientconfig": - case "clientConfig": target.setClientConfig(property(camelContext, org.asynchttpclient.AsyncHttpClientConfig.class, value)); return true; + case "client": target.setClient(property(camelContext, java.net.http.HttpClient.class, value)); return true; case "delay": target.setDelay(property(camelContext, long.class, value)); return true; case "exceptionhandler": case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true; @@ -99,8 +98,7 @@ public class TelegramEndpointConfigurer extends PropertyConfigurerSupport implem case "bufferSize": return int.class; case "chatid": case "chatId": return java.lang.String.class; - case "clientconfig": - case "clientConfig": return org.asynchttpclient.AsyncHttpClientConfig.class; + case "client": return java.net.http.HttpClient.class; case "delay": return long.class; case "exceptionhandler": case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class; @@ -162,8 +160,7 @@ public class TelegramEndpointConfigurer extends PropertyConfigurerSupport implem case "bufferSize": return target.getBufferSize(); case "chatid": case "chatId": return target.getConfiguration().getChatId(); - case "clientconfig": - case "clientConfig": return target.getClientConfig(); + case "client": return target.getClient(); case "delay": return target.getDelay(); case "exceptionhandler": case "exceptionHandler": return target.getExceptionHandler(); diff --git a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointUriFactory.java b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointUriFactory.java index e6060c27620..ed6fed83247 100644 --- a/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointUriFactory.java +++ b/components/camel-telegram/src/generated/java/org/apache/camel/component/telegram/TelegramEndpointUriFactory.java @@ -30,7 +30,7 @@ public class TelegramEndpointUriFactory extends org.apache.camel.support.compone props.add("bridgeErrorHandler"); props.add("bufferSize"); props.add("chatId"); - props.add("clientConfig"); + props.add("client"); props.add("delay"); props.add("exceptionHandler"); props.add("exchangePattern"); diff --git a/components/camel-telegram/src/generated/resources/org/apache/camel/component/telegram/telegram.json b/components/camel-telegram/src/generated/resources/org/apache/camel/component/telegram/telegram.json index b7f172aaf57..9bf97a82277 100644 --- a/components/camel-telegram/src/generated/resources/org/apache/camel/component/telegram/telegram.json +++ b/components/camel-telegram/src/generated/resources/org/apache/camel/component/telegram/telegram.json @@ -26,8 +26,7 @@ "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "baseUri": { "kind": "property", "displayName": "Base Uri", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "https:\/\/api.telegram.org", "description": "Can be used to set an alternative base URI, e.g. when you want to test the component against a mock Telegram API" }, - "client": { "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom AsyncHttpClient" }, - "clientConfig": { "kind": "property", "displayName": "Client Config", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClientConfig", "deprecated": false, "autowired": false, "secret": false, "description": "To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance." }, + "client": { "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.net.http.HttpClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom java.net.http.HttpClient" }, "authorizationToken": { "kind": "property", "displayName": "Authorization Token", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "The default Telegram authorization token to be used when the information is not provided in the endpoints." } }, "headers": { @@ -50,8 +49,8 @@ "chatId": { "kind": "parameter", "displayName": "Chat Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "The identifier of the chat that will receive the produced messages. Chat ids can be first obtained from incoming messages [...] "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may other [...] "baseUri": { "kind": "parameter", "displayName": "Base Uri", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "Can be used to set an alternative base URI, e.g. when you want to test the component against a mock Telegram API" }, - "bufferSize": { "kind": "parameter", "displayName": "Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 4096, "description": "The initial in-memory buffer size used when transferring data between Camel and AHC Client." }, - "clientConfig": { "kind": "parameter", "displayName": "Client Config", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.asynchttpclient.AsyncHttpClientConfig", "deprecated": false, "autowired": false, "secret": false, "description": "To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance." }, + "bufferSize": { "kind": "parameter", "displayName": "Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1048576, "description": "The initial in-memory buffer size used when transferring data between Camel and AHC Client." }, + "client": { "kind": "parameter", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.net.http.HttpClient", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HttpClient" }, "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": "proxy", "label": "proxy", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy host which could be used when sending out the message." }, "proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "proxy", "label": "proxy", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy port which could be used when sending out the message." }, "proxyType": { "kind": "parameter", "displayName": "Proxy Type", "group": "proxy", "label": "proxy", "required": false, "type": "object", "javaType": "org.apache.camel.component.telegram.TelegramProxyType", "enum": [ "HTTP", "SOCKS4", "SOCKS5" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTP", "configurationClass": "org.apache.camel.component.telegram.TelegramConfiguration", "configurationField": "configuration", "description": "HTTP proxy type which [...] diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramComponent.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramComponent.java index 51e4edd14b9..f921982190a 100644 --- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramComponent.java +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramComponent.java @@ -16,14 +16,13 @@ */ package org.apache.camel.component.telegram; +import java.net.http.HttpClient; import java.util.Map; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; @Component("telegram") public class TelegramComponent extends DefaultComponent { @@ -33,9 +32,7 @@ public class TelegramComponent extends DefaultComponent { private String authorizationToken; @Metadata(label = "advanced") - private AsyncHttpClient client; - @Metadata(label = "advanced") - private AsyncHttpClientConfig clientConfig; + private HttpClient client; @Metadata(label = "advanced", defaultValue = BOT_API_DEFAULT_URL, description = "Can be used to set an alternative base URI, e.g. when you want to test the component against a mock Telegram API") @@ -61,7 +58,7 @@ public class TelegramComponent extends DefaultComponent { configuration.setBaseUri(baseUri); } - TelegramEndpoint endpoint = new TelegramEndpoint(uri, this, configuration, client, clientConfig); + TelegramEndpoint endpoint = new TelegramEndpoint(uri, this, configuration, client); configuration.setAuthorizationToken(authorizationToken); setProperties(endpoint, parameters); @@ -84,28 +81,17 @@ public class TelegramComponent extends DefaultComponent { this.authorizationToken = authorizationToken; } - public AsyncHttpClient getClient() { + public HttpClient getClient() { return client; } /** - * To use a custom {@link AsyncHttpClient} + * To use a custom {@link java.net.http.HttpClient} */ - public void setClient(AsyncHttpClient client) { + public void setClient(HttpClient client) { this.client = client; } - public AsyncHttpClientConfig getClientConfig() { - return clientConfig; - } - - /** - * To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance. - */ - public void setClientConfig(AsyncHttpClientConfig clientConfig) { - this.clientConfig = clientConfig; - } - public String getBaseUri() { return baseUri; } diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java index 3c049efe52b..d7caad45755 100644 --- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.telegram; +import java.net.InetSocketAddress; +import java.net.ProxySelector; +import java.net.http.HttpClient; import java.util.Collections; import java.util.List; @@ -32,12 +35,6 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.ScheduledPollEndpoint; import org.apache.camel.util.ObjectHelper; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.asynchttpclient.proxy.ProxyServer; -import org.asynchttpclient.proxy.ProxyType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,34 +49,29 @@ public class TelegramEndpoint extends ScheduledPollEndpoint implements WebhookCa @UriParam private TelegramConfiguration configuration; @UriParam(label = "advanced") - private AsyncHttpClientConfig clientConfig; - @UriParam(label = "advanced", defaultValue = "" + (4 * 1024)) - private int bufferSize = 4 * 1024; + private HttpClient client; + @UriParam(label = "advanced", defaultValue = "" + (1024 * 1024)) + private int bufferSize = 1024 * 1024; private WebhookConfiguration webhookConfiguration; - private AsyncHttpClient client; private TelegramService telegramService; public TelegramEndpoint( String endpointUri, Component component, TelegramConfiguration configuration, - AsyncHttpClient client, - AsyncHttpClientConfig clientConfig) { + HttpClient client) { super(endpointUri, component); this.configuration = configuration; this.client = client; - this.clientConfig = clientConfig; } @Override protected void doStart() throws Exception { super.doStart(); if (client == null) { - DefaultAsyncHttpClientConfig.Builder builder = clientConfig != null - ? new DefaultAsyncHttpClientConfig.Builder(clientConfig) - : new DefaultAsyncHttpClientConfig.Builder(); + HttpClient.Builder builder = HttpClient.newBuilder(); if (configuration != null && ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { @@ -87,32 +79,21 @@ public class TelegramEndpoint extends ScheduledPollEndpoint implements WebhookCa configuration.getProxyType(), configuration.getProxyHost(), configuration.getProxyPort()); - builder.setProxyServer( - new ProxyServer.Builder(configuration.getProxyHost(), configuration.getProxyPort()) - .setProxyType(getProxyType(configuration.getProxyType())).build()); + + builder.proxy( + ProxySelector.of(new InetSocketAddress(configuration.getProxyHost(), configuration.getProxyPort()))); } - final AsyncHttpClientConfig config = builder.build(); - client = new DefaultAsyncHttpClient(config); + + client = builder.build(); } if (telegramService == null) { telegramService = new TelegramServiceRestBotAPIAdapter( client, - bufferSize, configuration.getBaseUri(), - configuration.getAuthorizationToken()); + configuration.getAuthorizationToken(), bufferSize); } } - @Override - protected void doStop() throws Exception { - super.doStop(); - // ensure client is closed when stopping - if (client != null && !client.isClosed()) { - client.close(); - } - client = null; - } - @Override public Producer createProducer() throws Exception { return new TelegramProducer(this); @@ -170,28 +151,17 @@ public class TelegramEndpoint extends ScheduledPollEndpoint implements WebhookCa return telegramService; } - public AsyncHttpClient getClient() { + public HttpClient getClient() { return client; } /** - * To use a custom {@link AsyncHttpClient} + * To use a custom {@link HttpClient} */ - public void setClient(AsyncHttpClient client) { + public void setClient(HttpClient client) { this.client = client; } - public AsyncHttpClientConfig getClientConfig() { - return clientConfig; - } - - /** - * To configure the AsyncHttpClient to use a custom com.ning.http.client.AsyncHttpClientConfig instance. - */ - public void setClientConfig(AsyncHttpClientConfig clientConfig) { - this.clientConfig = clientConfig; - } - public int getBufferSize() { return bufferSize; } @@ -202,21 +172,4 @@ public class TelegramEndpoint extends ScheduledPollEndpoint implements WebhookCa public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } - - private ProxyType getProxyType(TelegramProxyType type) { - if (type == null) { - return ProxyType.HTTP; - } - - switch (type) { - case HTTP: - return ProxyType.HTTP; - case SOCKS4: - return ProxyType.SOCKS_V4; - case SOCKS5: - return ProxyType.SOCKS_V5; - default: - throw new IllegalArgumentException("Unknown proxy type: " + type); - } - } } diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/OutgoingMessageHandler.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/OutgoingMessageHandler.java new file mode 100644 index 00000000000..0d227168c8e --- /dev/null +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/OutgoingMessageHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.telegram.service; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.ExecutionException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.component.telegram.model.MessageResult; +import org.apache.camel.component.telegram.model.OutgoingMessage; + +abstract class OutgoingMessageHandler<T extends OutgoingMessage> { + protected final ObjectMapper mapper; + protected final TelegramBodyPublisher bodyPublisher; + + private final HttpClient client; + private final String contentType; + private final String uri; + private final Class<? extends MessageResult> resultClass; + + public OutgoingMessageHandler(HttpClient client, ObjectMapper mapper, String uri, + String contentType, Class<? extends MessageResult> resultClass, int bufferSize) { + this.client = client; + this.mapper = mapper; + this.uri = uri; + this.contentType = contentType; + this.resultClass = resultClass; + + bodyPublisher = new TelegramBodyPublisher(bufferSize); + } + + public void sendMessage(Exchange exchange, AsyncCallback callback, T message) { + HttpRequest.Builder builder = HttpRequest.newBuilder().uri(URI.create(uri)); + + addBody(message); + + if (bodyPublisher.getBodyParts().size() > 1) { + builder.setHeader("Content-type", "multipart/form-data; boundary=\"" + bodyPublisher.getBoundary() + "\""); + } else { + if (contentType != null) { + builder.setHeader("Content-type", contentType); + } + } + + builder.setHeader("Accept", "application/json"); + + builder.POST(bodyPublisher.newPublisher()); + try { + final TelegramAsyncHandler telegramAsyncHandler + = new TelegramAsyncHandler(uri, resultClass, mapper, exchange, callback); + + client.sendAsync(builder.build(), HttpResponse.BodyHandlers.ofInputStream()) + .thenApply(telegramAsyncHandler::handleCompressedResponse).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + protected abstract void addBody(T message); + + protected void fillCommonMediaParts(OutgoingMessage message) { + + buildTextPart("chat_id", message.getChatId()); + buildTextPart("reply_to_message_id", message.getReplyToMessageId()); + buildTextPart("disable_notification", message.getDisableNotification()); + } + + protected <T> void buildTextPart(String name, T value) { + buildTextPart(bodyPublisher, name, value); + } + + static <T> void buildTextPart(TelegramBodyPublisher bodyPublisher, String name, T value) { + if (value != null) { + TelegramBodyPublisher.MultilineBodyPart<T> bodyPart + = new TelegramBodyPublisher.MultilineBodyPart<>(name, value, "text/plain"); + + bodyPublisher.addBodyPart(bodyPart); + } + } + + protected void buildMediaPart(String name, String fileNameWithExtension, byte[] value) { + buildMediaPart(bodyPublisher, name, fileNameWithExtension, value); + } + + void buildMediaPart(TelegramBodyPublisher bodyPublisher, String name, String fileNameWithExtension, byte[] value) { + TelegramBodyPublisher.MultilineBodyPart<byte[]> bodyPart + = new TelegramBodyPublisher.MultilineBodyPart<>(name, value, "application/octet-stream", null); + + bodyPart.addHeader("filename", fileNameWithExtension); + + bodyPublisher.addBodyPart(bodyPart); + } + +} diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramAsyncHandler.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramAsyncHandler.java new file mode 100644 index 00000000000..3b180356d1a --- /dev/null +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramAsyncHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.telegram.service; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.telegram.model.MessageResult; +import org.apache.camel.support.GZIPHelper; +import org.apache.camel.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TelegramAsyncHandler { + private static final Logger LOG = LoggerFactory.getLogger(TelegramAsyncHandler.class); + + private final String uri; + private final Class<? extends MessageResult> resultClass; + private final ObjectMapper mapper; + private final Exchange exchange; + private final AsyncCallback callback; + + TelegramAsyncHandler(String uri, Class<? extends MessageResult> resultClass, ObjectMapper mapper, Exchange exchange, + AsyncCallback callback) { + this.uri = uri; + this.resultClass = resultClass; + this.mapper = mapper; + this.exchange = exchange; + this.callback = callback; + } + + public static String extractCharset(String line, String defaultValue) { + if (line == null) { + return defaultValue; + } + + final String[] parts = line.split(" "); + String charsetInfo = ""; + + for (var part : parts) { + if (part.startsWith("charset")) { + charsetInfo = part; + break; + } + } + + final String charset = charsetInfo.replace("charset=", "").replace(";", ""); + + if (charset.isBlank()) { + return defaultValue; + } + + return charset; + + } + + public Object handleCompressedResponse(HttpResponse<InputStream> response) { + Object result; + final boolean success = response.statusCode() >= 200 && response.statusCode() < 300; + + String charsetInfo = response.headers().firstValue("Content-Type").orElse(null); + final String charset = extractCharset(charsetInfo, StandardCharsets.UTF_8.name()); + + final String contentEncoding = response.headers().firstValue("Content-Encoding").orElse(null); + + try (InputStream is = GZIPHelper.uncompressGzip(contentEncoding, response.body()); + Reader r = new InputStreamReader(is, charset)) { + + if (LOG.isDebugEnabled()) { + response.headers().map().forEach((key, value) -> LOG.debug("header {}={}", key, value)); + } + + if (success) { + if (LOG.isTraceEnabled()) { + final String body = IOHelper.toString(r); + LOG.trace("Received body for {}: {}", uri, body); + result = mapper.readValue(body, resultClass); + } else { + result = mapper.readValue(r, resultClass); + } + + exchange.getMessage().setBody(result); + } else { + throw new RuntimeCamelException( + uri + " responded: " + response.statusCode() + IOHelper.toString(r)); + } + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException(ex); + } catch (IOException ex) { + throw new RuntimeCamelException("Could not parse the response from " + uri, ex); + } finally { + callback.done(false); + } + + return exchange; + } +} diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramBodyPublisher.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramBodyPublisher.java new file mode 100644 index 00000000000..9590bbcd0b5 --- /dev/null +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramBodyPublisher.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.telegram.service; + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TelegramBodyPublisher { + private static final Logger LOG = LoggerFactory.getLogger(TelegramBodyPublisher.class); + + private final Set<TelegramBodyPart> bodyParts = new LinkedHashSet<>(); + private final String boundary = RandomStringUtils.randomAlphanumeric(12); + private final int bufferSize; + + public TelegramBodyPublisher(int bufferSize) { + this.bufferSize = bufferSize; + } + + public interface TelegramBodyPart { + void serialize(ByteBuffer buffer, String separator); + } + + public static class SingleBodyPart implements TelegramBodyPart { + private final String body; + + public SingleBodyPart(String body) { + this.body = body; + } + + @Override + public void serialize(ByteBuffer buffer, String boundary) { + buffer.put(body.getBytes()); + } + + } + + public static class MultilineBodyPart<T> implements TelegramBodyPart { + private final String contentType; + private final Map<String, Object> headers = new LinkedHashMap<>(); + private final T body; + private final String charset; + + public MultilineBodyPart(String name, T body, String contentType) { + this(name, body, contentType, StandardCharsets.UTF_8.name()); + } + + public MultilineBodyPart(String name, T body, String contentType, String charset) { + this.body = body; + this.contentType = contentType; + this.charset = charset; + + addHeader("name", name); + } + + public void addHeader(String key, String value) { + headers.put(key, value); + } + + @Override + public void serialize(ByteBuffer buffer, String boundary) { + String partHeader = "--" + boundary + "\r\n"; + buffer.put(partHeader.getBytes()); + + String contentDisposition = "Content-Disposition: form-data; "; + + // this creates the key-pair part of the content disposition (i.e.: name="myName"; file="myFile.doc") + contentDisposition += headers.entrySet() + .stream() + .map(e -> e.getKey().toLowerCase() + "=\"" + e.getValue().toString() + "\"") + .collect(Collectors.joining("; ")) + "\r\n"; + buffer.put(contentDisposition.getBytes()); + + String contentTypePart = "Content-Type: " + contentType; + if (charset != null) { + contentTypePart += "; charset=" + charset; + } + contentTypePart += "\r\n\r\n"; + + buffer.put(contentTypePart.getBytes()); + + if (body instanceof String) { + buffer.put(((String) body).getBytes()); + } else { + if (body instanceof byte[]) { + buffer.put((byte[]) body); + } + } + + buffer.put("\r\n".getBytes()); + } + + public static void serializeEnd(ByteBuffer buffer, String separator) { + String partHeader = "--" + separator + "--\r\n"; + buffer.put(partHeader.getBytes()); + } + } + + public void addBodyPart(TelegramBodyPart bodyPart) { + bodyParts.add(bodyPart); + } + + public HttpRequest.BodyPublisher newPublisher() { + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + LOG.debug("Allocating {} bytes", bufferSize); + serialize(bodyParts, buffer, boundary); + int written = buffer.capacity() - buffer.remaining(); + + return HttpRequest.BodyPublishers.ofByteArray(buffer.array(), 0, written); + } + + static void serialize(Set<TelegramBodyPart> bodyParts, ByteBuffer buffer, String separator) { + try { + boolean isMultiBody = false; + + for (TelegramBodyPart bodyPart : bodyParts) { + bodyPart.serialize(buffer, separator); + if (bodyPart instanceof MultilineBodyPart) { + isMultiBody = true; + } + } + + if (isMultiBody) { + MultilineBodyPart.serializeEnd(buffer, separator); + } + + } finally { + bodyParts.clear(); + } + } + + Set<TelegramBodyPart> getBodyParts() { + return bodyParts; + } + + String getBoundary() { + return boundary; + } +} diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramServiceRestBotAPIAdapter.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramServiceRestBotAPIAdapter.java index 4a1934e1f3f..2436b59f67a 100644 --- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramServiceRestBotAPIAdapter.java +++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/service/TelegramServiceRestBotAPIAdapter.java @@ -16,21 +16,18 @@ */ package org.apache.camel.component.telegram.service; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.handler.codec.http.HttpHeaders; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; @@ -61,23 +58,10 @@ import org.apache.camel.component.telegram.model.SendVenueMessage; import org.apache.camel.component.telegram.model.StopMessageLiveLocationMessage; import org.apache.camel.component.telegram.model.UpdateResult; import org.apache.camel.component.telegram.model.WebhookResult; -import org.apache.camel.support.GZIPHelper; -import org.apache.camel.util.IOHelper; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.HttpResponseBodyPart; -import org.asynchttpclient.HttpResponseStatus; -import org.asynchttpclient.Request; -import org.asynchttpclient.RequestBuilder; -import org.asynchttpclient.Response; -import org.asynchttpclient.request.body.multipart.ByteArrayPart; -import org.asynchttpclient.request.body.multipart.StringPart; +import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.asynchttpclient.util.HttpUtils.extractContentTypeCharsetAttribute; -import static org.asynchttpclient.util.MiscUtils.withDefault; - /** * Adapts the {@code RestBotAPI} to the {@code TelegramService} interface. */ @@ -85,94 +69,104 @@ public class TelegramServiceRestBotAPIAdapter implements TelegramService { private static final Logger LOG = LoggerFactory.getLogger(TelegramServiceRestBotAPIAdapter.class); private final Map<Class<?>, OutgoingMessageHandler<?>> handlers; - private final AsyncHttpClient asyncHttpClient; + private final HttpClient client; + + @Deprecated private final ObjectMapper mapper; private final String baseUri; - public TelegramServiceRestBotAPIAdapter(AsyncHttpClient asyncHttpClient, int bufferSize, String telegramBaseUri, - String authorizationToken) { - this.asyncHttpClient = asyncHttpClient; + public TelegramServiceRestBotAPIAdapter(HttpClient client, String telegramBaseUri, + String authorizationToken, int bufferSize) { + this.client = client; + this.baseUri = telegramBaseUri + "/bot" + authorizationToken; this.mapper = new ObjectMapper(); final Map<Class<?>, OutgoingMessageHandler<?>> m = new HashMap<>(); - m.put(OutgoingTextMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/sendMessage")); - m.put(OutgoingPhotoMessage.class, new OutgoingPhotoMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri)); - m.put(OutgoingAudioMessage.class, new OutgoingAudioMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri)); - m.put(OutgoingVideoMessage.class, new OutgoingVideoMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri)); - m.put(OutgoingDocumentMessage.class, new OutgoingDocumentMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri)); - m.put(OutgoingStickerMessage.class, new OutgoingStickerMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri)); - m.put(OutgoingGameMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/sendGame")); + m.put(OutgoingTextMessage.class, new OutgoingPlainMessageHandler(client, mapper, baseUri + "/sendMessage", bufferSize)); + m.put(OutgoingPhotoMessage.class, new OutgoingPhotoMessageHandler(client, mapper, baseUri, bufferSize)); + m.put(OutgoingAudioMessage.class, new OutgoingAudioMessageHandler(client, mapper, baseUri, bufferSize)); + m.put(OutgoingVideoMessage.class, new OutgoingVideoMessageHandler(client, mapper, baseUri, bufferSize)); + m.put(OutgoingDocumentMessage.class, new OutgoingDocumentMessageHandler(client, mapper, baseUri, bufferSize)); + m.put(OutgoingStickerMessage.class, new OutgoingStickerMessageHandler(client, mapper, baseUri, bufferSize)); + m.put(OutgoingGameMessage.class, new OutgoingPlainMessageHandler(client, mapper, baseUri + "/sendGame", bufferSize)); m.put(SendLocationMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/sendLocation")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/sendLocation", bufferSize)); m.put(EditMessageLiveLocationMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/editMessageLiveLocation")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/editMessageLiveLocation", bufferSize)); m.put(StopMessageLiveLocationMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/stopMessageLiveLocation")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/stopMessageLiveLocation", bufferSize)); m.put(SendVenueMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/sendVenue")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/sendVenue", bufferSize)); m.put(EditMessageTextMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/editMessageText")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/editMessageText", bufferSize)); m.put(EditMessageCaptionMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/editMessageCaption")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/editMessageCaption", bufferSize)); m.put(EditMessageMediaMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/editMessageMedia")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/editMessageMedia", bufferSize)); m.put(EditMessageDelete.class, new OutgoingPlainMessageHandler( - asyncHttpClient, bufferSize, mapper, baseUri + "/deleteMessage")); + client, + mapper, baseUri + "/deleteMessage", bufferSize)); m.put(EditMessageReplyMarkupMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/editMessageReplyMarkup")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/editMessageReplyMarkup", bufferSize)); m.put(OutgoingCallbackQueryMessage.class, new OutgoingPlainMessageHandler( - asyncHttpClient, bufferSize, mapper, baseUri + "/answerCallbackQuery")); + client, + mapper, baseUri + "/answerCallbackQuery", bufferSize)); m.put(OutgoingSetGameScoreMessage.class, - new OutgoingPlainMessageHandler(asyncHttpClient, bufferSize, mapper, baseUri + "/setGameScore")); + new OutgoingPlainMessageHandler(client, mapper, baseUri + "/setGameScore", bufferSize)); m.put(OutgoingGetGameHighScoresMessage.class, new OutgoingPlainMessageHandler( - asyncHttpClient, bufferSize, mapper, baseUri + "/getGameHighScores", MessageResultGameScores.class)); + client, + mapper, baseUri + "/getGameHighScores", MessageResultGameScores.class, bufferSize)); m.put(OutgoingAnswerInlineQuery.class, new OutgoingPlainMessageHandler( - asyncHttpClient, bufferSize, mapper, baseUri + "/answerInlineQuery")); + client, + mapper, baseUri + "/answerInlineQuery", bufferSize)); this.handlers = m; } @Override public UpdateResult getUpdates(Long offset, Integer limit, Integer timeoutSeconds) { - final String uri = baseUri + "/getUpdates"; - final RequestBuilder request = new RequestBuilder("GET") - .setUrl(uri); + Map<String, Object> parameters = new HashMap<>(); if (offset != null) { - request.addQueryParam("offset", String.valueOf(offset)); + parameters.put("offset", String.valueOf(offset)); } if (limit != null) { - request.addQueryParam("limit", String.valueOf(limit)); + parameters.put("limit", String.valueOf(limit)); } if (timeoutSeconds != null) { - request.addQueryParam("timeout", String.valueOf(timeoutSeconds)); + parameters.put("timeout", String.valueOf(timeoutSeconds)); + } + + try { + String uri = URISupport.appendParametersToURI(baseUri + "/getUpdates", parameters); + + final HttpRequest request = HttpRequest.newBuilder().uri(URI.create(uri)).GET().build(); + return sendSyncRequest(request, UpdateResult.class); + } catch (URISyntaxException | UnsupportedEncodingException e) { + throw new RuntimeException(e); } - return sendSyncRequest(request.build(), UpdateResult.class); } - <T> T sendSyncRequest(final Request request, Class<T> resultType) { + <T> T sendSyncRequest(final HttpRequest request, Class<T> resultType) { try { - final Response response = asyncHttpClient.executeRequest(request).get(); - int code = response.getStatusCode(); + final HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); + + int code = response.statusCode(); if (code >= 200 && code < 300) { - try { - final String responseBody = response.getResponseBody(); - if (LOG.isTraceEnabled()) { - LOG.trace("Received body for {} {}: {}", request.getMethod(), request.getUrl(), responseBody); - } - return mapper.readValue(responseBody, resultType); - } catch (IOException e) { - throw new RuntimeCamelException( - "Could not parse the response from " + request.getMethod() + " " + request.getUrl(), e); + final String responseBody = response.body(); + if (LOG.isTraceEnabled()) { + LOG.trace("Received body for {} {}: {}", request.method(), request.uri(), responseBody); } + return mapper.readValue(responseBody, resultType); } else { throw new TelegramException( - "Could not " + request.getMethod() + " " + request.getUrl() + ": " + response.getStatusCode() + " " - + response.getStatusText(), - response.getStatusCode(), response.getStatusText()); + "Could not " + request.method() + " " + request.uri() + ": " + response.statusCode() + " " + + response.body(), + response.statusCode(), response.body()); } - } catch (ExecutionException e) { - throw new RuntimeCamelException("Could not request " + request.getMethod() + " " + request.getUrl(), e); + } catch (JsonProcessingException e) { + throw new RuntimeCamelException( + "Could not parse the response from " + request.method() + " " + request.uri(), e); + } catch (IOException e) { + throw new RuntimeCamelException("Could not request " + request.method() + " " + request.uri(), e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeCamelException(e); @@ -182,8 +176,9 @@ public class TelegramServiceRestBotAPIAdapter implements TelegramService { @Override public boolean setWebhook(String url) { final String uri = baseUri + "/setWebhook?url=" + url; - final RequestBuilder request = new RequestBuilder("GET") - .setUrl(uri); + + final HttpRequest.Builder request = HttpRequest.newBuilder().uri(URI.create(uri)).GET(); + WebhookResult res = sendSyncRequest(request.build(), WebhookResult.class); return res.isOk() && res.isResult(); } @@ -207,21 +202,23 @@ public class TelegramServiceRestBotAPIAdapter implements TelegramService { static class OutgoingPlainMessageHandler extends OutgoingMessageHandler<OutgoingMessage> { - public OutgoingPlainMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String uri, Class<? extends MessageResult> returnType) { - super(asyncHttpClient, bufferSize, mapper, uri, "application/json", returnType); + public OutgoingPlainMessageHandler(HttpClient client, ObjectMapper mapper, + String uri, Class<? extends MessageResult> returnType, int bufferSize) { + super(client, mapper, uri, "application/json", returnType, bufferSize); } - public OutgoingPlainMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String uri) { - this(asyncHttpClient, bufferSize, mapper, uri, MessageResult.class); + public OutgoingPlainMessageHandler(HttpClient client, ObjectMapper mapper, + String uri, int bufferSize) { + this(client, mapper, uri, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingMessage message) { + protected void addBody(OutgoingMessage message) { try { final String body = mapper.writeValueAsString(message); - builder.setBody(body); + + bodyPublisher.addBodyPart(new TelegramBodyPublisher.SingleBodyPart(body)); + } catch (JsonProcessingException e) { throw new RuntimeCamelException("Could not serialize " + message); } @@ -231,251 +228,91 @@ public class TelegramServiceRestBotAPIAdapter implements TelegramService { static class OutgoingAudioMessageHandler extends OutgoingMessageHandler<OutgoingAudioMessage> { - public OutgoingAudioMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String baseUri) { - super(asyncHttpClient, bufferSize, mapper, baseUri + "/sendAudio", null, MessageResult.class); + public OutgoingAudioMessageHandler(HttpClient client, ObjectMapper mapper, + String baseUri, int bufferSize) { + super(client, mapper, baseUri + "/sendAudio", null, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingAudioMessage message) { - fillCommonMediaParts(builder, message); - buildMediaPart(builder, "audio", message.getFilenameWithExtension(), message.getAudio()); - buildTextPart(builder, "title", message.getTitle()); - buildTextPart(builder, "duration", message.getDurationSeconds()); - buildTextPart(builder, "performer", message.getPerformer()); - buildTextPart(builder, "reply_markup", message.replyMarkupJson()); + protected void addBody(OutgoingAudioMessage message) { + fillCommonMediaParts(message); + buildMediaPart("audio", message.getFilenameWithExtension(), message.getAudio()); + buildTextPart("title", message.getTitle()); + buildTextPart("duration", message.getDurationSeconds()); + buildTextPart("performer", message.getPerformer()); + buildTextPart("reply_markup", message.replyMarkupJson()); } } static class OutgoingVideoMessageHandler extends OutgoingMessageHandler<OutgoingVideoMessage> { - public OutgoingVideoMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String baseUri) { - super(asyncHttpClient, bufferSize, mapper, baseUri + "/sendVideo", null, MessageResult.class); + public OutgoingVideoMessageHandler(HttpClient client, ObjectMapper mapper, + String baseUri, int bufferSize) { + super(client, mapper, baseUri + "/sendVideo", null, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingVideoMessage message) { - fillCommonMediaParts(builder, message); - buildMediaPart(builder, "video", message.getFilenameWithExtension(), message.getVideo()); - buildTextPart(builder, "caption", message.getCaption()); - buildTextPart(builder, "duration", message.getDurationSeconds()); - buildTextPart(builder, "width", message.getWidth()); - buildTextPart(builder, "height", message.getHeight()); - buildTextPart(builder, "reply_markup", message.replyMarkupJson()); + protected void addBody(OutgoingVideoMessage message) { + fillCommonMediaParts(message); + buildMediaPart("video", message.getFilenameWithExtension(), message.getVideo()); + buildTextPart("caption", message.getCaption()); + buildTextPart("duration", message.getDurationSeconds()); + buildTextPart("width", message.getWidth()); + buildTextPart("height", message.getHeight()); + buildTextPart("reply_markup", message.replyMarkupJson()); } - } static class OutgoingDocumentMessageHandler extends OutgoingMessageHandler<OutgoingDocumentMessage> { - public OutgoingDocumentMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String baseUri) { - super(asyncHttpClient, bufferSize, mapper, baseUri + "/sendDocument", null, MessageResult.class); + public OutgoingDocumentMessageHandler(HttpClient client, ObjectMapper mapper, + String baseUri, int bufferSize) { + super(client, mapper, baseUri + "/sendDocument", null, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingDocumentMessage message) { - fillCommonMediaParts(builder, message); - buildMediaPart(builder, "document", message.getFilenameWithExtension(), message.getDocument()); - buildTextPart(builder, "caption", message.getCaption()); - buildTextPart(builder, "reply_markup", message.replyMarkupJson()); + protected void addBody(OutgoingDocumentMessage message) { + fillCommonMediaParts(message); + buildMediaPart("document", message.getFilenameWithExtension(), message.getDocument()); + buildTextPart("caption", message.getCaption()); + buildTextPart("reply_markup", message.replyMarkupJson()); } } static class OutgoingPhotoMessageHandler extends OutgoingMessageHandler<OutgoingPhotoMessage> { - public OutgoingPhotoMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String baseUri) { - super(asyncHttpClient, bufferSize, mapper, baseUri + "/sendPhoto", null, MessageResult.class); + public OutgoingPhotoMessageHandler(HttpClient client, ObjectMapper mapper, + String baseUri, int bufferSize) { + super(client, mapper, baseUri + "/sendPhoto", null, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingPhotoMessage message) { - fillCommonMediaParts(builder, message); - buildMediaPart(builder, "photo", message.getFilenameWithExtension(), message.getPhoto()); - buildTextPart(builder, "caption", message.getCaption()); - buildTextPart(builder, "reply_markup", message.replyMarkupJson()); + protected void addBody(OutgoingPhotoMessage message) { + fillCommonMediaParts(message); + buildMediaPart("photo", message.getFilenameWithExtension(), message.getPhoto()); + buildTextPart("caption", message.getCaption()); + buildTextPart("reply_markup", message.replyMarkupJson()); } } static class OutgoingStickerMessageHandler extends OutgoingMessageHandler<OutgoingStickerMessage> { - public OutgoingStickerMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, - String baseUri) { - super(asyncHttpClient, bufferSize, mapper, baseUri + "/sendSticker", null, MessageResult.class); + public OutgoingStickerMessageHandler(HttpClient client, ObjectMapper mapper, + String baseUri, int bufferSize) { + super(client, mapper, baseUri + "/sendSticker", null, MessageResult.class, bufferSize); } @Override - protected void addBody(RequestBuilder builder, OutgoingStickerMessage message) { - fillCommonMediaParts(builder, message); + protected void addBody(OutgoingStickerMessage message) { + fillCommonMediaParts(message); if (message.getSticker() != null) { - buildTextPart(builder, "sticker", message.getSticker()); + buildTextPart("sticker", message.getSticker()); } else { - buildMediaPart(builder, "sticker", message.getFilenameWithExtension(), message.getStickerImage()); - } - } - } - - abstract static class OutgoingMessageHandler<T extends OutgoingMessage> { - protected final ObjectMapper mapper; - private final AsyncHttpClient asyncHttpClient; - private final int bufferSize; - private final String contentType; - private final String uri; - private final Class<? extends MessageResult> resultClass; - - public OutgoingMessageHandler(AsyncHttpClient asyncHttpClient, int bufferSize, ObjectMapper mapper, String uri, - String contentType, Class<? extends MessageResult> resultClass) { - this.resultClass = resultClass; - this.asyncHttpClient = asyncHttpClient; - this.bufferSize = bufferSize; - this.mapper = mapper; - this.uri = uri; - this.contentType = contentType; - } - - public void sendMessage(Exchange exchange, AsyncCallback callback, T message) { - final RequestBuilder builder = new RequestBuilder("POST") - .setUrl(uri); - if (contentType != null) { - builder.setHeader("Content-Type", contentType); - } - builder.setHeader("Accept", "application/json"); - addBody(builder, message); - asyncHttpClient.executeRequest(builder.build(), - new TelegramAsyncHandler(exchange, callback, uri, bufferSize, mapper, resultClass)); - } - - protected abstract void addBody(RequestBuilder builder, T message); - - protected void fillCommonMediaParts(RequestBuilder builder, OutgoingMessage message) { - buildTextPart(builder, "chat_id", message.getChatId()); - buildTextPart(builder, "reply_to_message_id", message.getReplyToMessageId()); - buildTextPart(builder, "disable_notification", message.getDisableNotification()); - } - - protected void buildTextPart(RequestBuilder builder, String name, Object value) { - if (value != null) { - builder.addBodyPart(new StringPart(name, String.valueOf(value), "text/plain", StandardCharsets.UTF_8)); + buildMediaPart("sticker", message.getFilenameWithExtension(), message.getStickerImage()); } } - - protected void buildMediaPart(RequestBuilder builder, String name, String fileNameWithExtension, byte[] value) { - builder.addBodyPart( - new ByteArrayPart(name, value, "application/octet-stream", StandardCharsets.UTF_8, fileNameWithExtension)); - } - } - private static final class TelegramAsyncHandler implements AsyncHandler<Exchange> { - - private final Exchange exchange; - private final AsyncCallback callback; - private final String url; - private final ByteArrayOutputStream os; - private final ObjectMapper mapper; - private int statusCode; - private String statusText; - private String contentType; - private String contentEncoding; - private Charset charset; - private Class<? extends MessageResult> onCompletedType; - - private TelegramAsyncHandler(Exchange exchange, AsyncCallback callback, String url, int bufferSize, - ObjectMapper mapper, Class<? extends MessageResult> onCompletedType) { - this.onCompletedType = onCompletedType; - this.exchange = exchange; - this.callback = callback; - this.url = url; - this.os = new ByteArrayOutputStream(bufferSize); - this.mapper = mapper; - } - - @Override - public void onThrowable(Throwable t) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} onThrowable {}", exchange.getExchangeId(), t.getMessage(), t); - } - exchange.setException(t); - callback.done(false); - } - - @Override - public Exchange onCompleted() throws Exception { - if (LOG.isTraceEnabled()) { - LOG.trace("{} onCompleted", exchange.getExchangeId()); - } - try { - // copy from output stream to input stream - os.flush(); - os.close(); - final boolean success = statusCode >= 200 && statusCode < 300; - try (InputStream maybeGzStream = new ByteArrayInputStream(os.toByteArray()); - InputStream is = GZIPHelper.uncompressGzip(contentEncoding, maybeGzStream); - Reader r = new InputStreamReader(is, charset)) { - - if (success) { - final Object result; - if (LOG.isTraceEnabled()) { - final String body = IOHelper.toString(r); - LOG.trace("Received body for {}: {}", url, body); - result = mapper.readValue(body, onCompletedType); - } else { - result = mapper.readValue(r, onCompletedType); - } - - exchange.getMessage().setBody(result); - } else { - throw new RuntimeCamelException( - url + " responded: " + statusCode + " " + statusText + " " + IOHelper.toString(r)); - } - } catch (IOException e) { - throw new RuntimeCamelException("Could not parse the response from " + url, e); - } - } catch (Exception e) { - exchange.setException(e); - } finally { - // signal we are done - callback.done(false); - } - return exchange; - } - - @Override - public String toString() { - return "AhcAsyncHandler for exchangeId: " + exchange.getExchangeId() + " -> " + url; - } - - @Override - public State onBodyPartReceived(HttpResponseBodyPart bodyPart) - throws Exception { - // write body parts to stream, which we will bind to the Camel Exchange in onComplete - os.write(bodyPart.getBodyPartBytes()); - if (LOG.isTraceEnabled()) { - LOG.trace("{} onBodyPartReceived {} bytes", exchange.getExchangeId(), bodyPart.length()); - } - return State.CONTINUE; - } - - @Override - public State onStatusReceived(HttpResponseStatus responseStatus) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} onStatusReceived {}", exchange.getExchangeId(), responseStatus); - } - statusCode = responseStatus.getStatusCode(); - statusText = responseStatus.getStatusText(); - return State.CONTINUE; - } - - @Override - public State onHeadersReceived(HttpHeaders headers) { - contentEncoding = headers.get("Content-Encoding"); - contentType = headers.get("Content-Type"); - charset = withDefault(extractContentTypeCharsetAttribute(contentType), StandardCharsets.UTF_8); - return State.CONTINUE; - } - } } diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java index fb4ad5e0b31..3f301b31072 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java @@ -83,7 +83,7 @@ public class TelegramConsumerHealthCheckErrorTest extends TelegramTestSupport { rc.getDetails().get(HealthCheck.ENDPOINT_URI)); Throwable e = rc.getError().get(); - Assertions.assertTrue(e.getMessage().contains("401 Unauthorized")); + Assertions.assertTrue(e.getMessage().contains("401")); Assertions.assertEquals(401, rc.getDetails().get(HealthCheck.HTTP_RESPONSE_CODE)); } diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramWebhookRegistrationTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramWebhookRegistrationTest.java index 9c4a2a5cbb7..41da912f2d7 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramWebhookRegistrationTest.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramWebhookRegistrationTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.telegram; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.List; import java.util.concurrent.TimeUnit; @@ -27,8 +31,6 @@ import org.apache.camel.component.telegram.util.TelegramMockRoutes.MockProcessor import org.apache.camel.component.telegram.util.TelegramTestSupport; import org.apache.camel.component.telegram.util.TelegramTestUtil; import org.apache.camel.impl.DefaultCamelContext; -import org.asynchttpclient.Dsl; -import org.asynchttpclient.Response; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -52,10 +54,12 @@ public class TelegramWebhookRegistrationTest extends TelegramTestSupport { Awaitility.await() .atMost(5, TimeUnit.SECONDS) .until(() -> { - final Response testResponse = Dsl.asyncHttpClient() - .prepareGet("http://localhost:" + port + "/botmock-token/getTest") - .execute().get(); - return testResponse.getStatusCode() == 200; + HttpClient client = HttpClient.newBuilder().build(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + port + "/botmock-token/getTest")).GET().build(); + + final HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); + return response.statusCode() == 200; }); context().addRoutes(new RouteBuilder() { @@ -92,10 +96,12 @@ public class TelegramWebhookRegistrationTest extends TelegramTestSupport { Awaitility.await() .atMost(5, TimeUnit.SECONDS) .until(() -> { - final Response testResponse = Dsl.asyncHttpClient() - .prepareGet("http://localhost:" + port + "/botmock-token/getTest") - .execute().get(); - return testResponse.getStatusCode() == 200; + HttpClient client = HttpClient.newBuilder().build(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + port + "/botmock-token/getTest")).GET().build(); + + final HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); + return response.statusCode() == 200; }); context().addRoutes(new RouteBuilder() { diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceIT.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceIT.java index 5cf2b6248bc..6df830d1d69 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceIT.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceIT.java @@ -56,13 +56,17 @@ import org.apache.camel.component.telegram.util.TelegramApiConfig; import org.apache.camel.component.telegram.util.TelegramTestSupport; import org.apache.camel.component.telegram.util.TelegramTestUtil; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertNotNull; @EnabledIfEnvironmentVariable(named = "TELEGRAM_AUTHORIZATION_TOKEN", matches = ".*") public class TelegramServiceIT extends TelegramTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(TelegramServiceIT.class); protected TelegramApiConfig getTelegramApiConfig() { return TelegramApiConfig.fromEnv(); @@ -74,6 +78,7 @@ public class TelegramServiceIT extends TelegramTestSupport { * So, for this test to succeed a human should have sent some messages to the bot manually * before running the test */ IncomingMessage res = consumer.receiveBody("telegram://bots", 5000, IncomingMessage.class); + LOG.debug("Chat ID: {} - use this for running the tests", res.getChat().getId()); assertNotNull(res); } @@ -507,6 +512,7 @@ public class TelegramServiceIT extends TelegramTestSupport { Assertions.assertEquals("text_link", incomingMessage.getCaptionEntities().get(0).getType()); } + @Disabled("Unlike testEditMediaToAnimation, this does not work. It needs to be investigated") @Test void testEditMediaToAudio() throws IOException { @@ -538,7 +544,7 @@ public class TelegramServiceIT extends TelegramTestSupport { void testEditMediaToAnimation() throws IOException { //given - String mediaUrl = "CgADBAADlZ8AAtMdZAd-ZylyB-kkGRYE"; + String mediaUrl = "http://file-examples.com/storage/fe783a5cbb6323602a28c66/2017/10/file_example_GIF_500kB.gif"; String caption = "caption"; //send photo message @@ -659,9 +665,10 @@ public class TelegramServiceIT extends TelegramTestSupport { Assertions.assertTrue(incomingMessage.isResult()); } + @Disabled("This one requires manual setup of a game") @Test void testSendGameMessage() { - String gameShortName = "<game-short-name>"; + String gameShortName = "gameShortName"; String gameText = "<game-text>"; long gameScore = 15L; diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceProxyIT.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceProxyIT.java index c36dc31c8c8..8d7325a6975 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceProxyIT.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/integration/TelegramServiceProxyIT.java @@ -20,6 +20,7 @@ import org.apache.camel.component.telegram.model.IncomingMessage; import org.apache.camel.component.telegram.model.OutgoingTextMessage; import org.apache.camel.component.telegram.util.TelegramApiConfig; import org.apache.camel.component.telegram.util.TelegramTestSupport; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; @@ -36,8 +37,13 @@ public class TelegramServiceProxyIT extends TelegramTestSupport { @BeforeAll public static void configureProxyFromEnv() { proxyHost = System.getenv("TELEGRAM_PROXY_HOST"); + Assumptions.assumeTrue(proxyHost != null, "There is no proxy host defined on this environment"); + proxyPort = System.getenv("TELEGRAM_PROXY_PORT"); + Assumptions.assumeTrue(proxyPort != null, "There is no proxy port defined on this environment"); + proxyType = System.getenv("TELEGRAM_PROXY_TYPE"); + Assumptions.assumeTrue(proxyType != null, "There is no proxy type defined on this environment"); } protected TelegramApiConfig getTelegramApiConfig() { diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/OutgoingMessageHandlerTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/OutgoingMessageHandlerTest.java new file mode 100644 index 00000000000..f45c07616a5 --- /dev/null +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/OutgoingMessageHandlerTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.telegram.service; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayName("Multipart message serialization tests") +class OutgoingMessageHandlerTest { + + @DisplayName("Test single body message") + @Test + void testSingleBody() { + TelegramBodyPublisher bodyPublisher = new TelegramBodyPublisher(100); + + bodyPublisher.addBodyPart(new TelegramBodyPublisher.SingleBodyPart("message=\"value\"")); + + ByteBuffer buffer = ByteBuffer.allocate(100); + Set<TelegramBodyPublisher.TelegramBodyPart> bodyParts = bodyPublisher.getBodyParts(); + + TelegramBodyPublisher.serialize(bodyParts, buffer, ""); + + int written = buffer.capacity() - buffer.remaining(); + String serialized = new String(buffer.array(), 0, written); + + assertEquals("message=\"value\"", serialized); + } + + @DisplayName("Test multi body message with 1 body") + @Test + void testMultiBody() { + TelegramBodyPublisher bodyPublisher = new TelegramBodyPublisher(1024); + + bodyPublisher + .addBodyPart(new TelegramBodyPublisher.MultilineBodyPart<>("message", "value1", StandardCharsets.UTF_8.name())); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + Set<TelegramBodyPublisher.TelegramBodyPart> bodyParts = bodyPublisher.getBodyParts(); + + TelegramBodyPublisher.serialize(bodyParts, buffer, "aaa"); + + int written = buffer.capacity() - buffer.remaining(); + String serialized = new String(buffer.array(), 0, written); + + assertEquals("--aaa\r\n" + + "Content-Disposition: form-data; name=\"message\"\r\n" + + "Content-Type: UTF-8; charset=UTF-8\r\n\r\n" + + "value1\r\n" + + "--aaa--\r\n", + serialized); + } + + @DisplayName("Test multi body message with 1 body and multiple headers") + @Test + void testMultiBodyWithMoreHeaders() { + TelegramBodyPublisher bodyPublisher = new TelegramBodyPublisher(1024); + + final TelegramBodyPublisher.MultilineBodyPart<String> stringMultilineBodyPart + = new TelegramBodyPublisher.MultilineBodyPart<>("message", "value1", StandardCharsets.UTF_8.name()); + + // Headers must be serialized in the insertion order + stringMultilineBodyPart.addHeader("key1", "value1"); + stringMultilineBodyPart.addHeader("key2", "value2"); + bodyPublisher.addBodyPart(stringMultilineBodyPart); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + Set<TelegramBodyPublisher.TelegramBodyPart> bodyParts = bodyPublisher.getBodyParts(); + + TelegramBodyPublisher.serialize(bodyParts, buffer, "aaa"); + + int written = buffer.capacity() - buffer.remaining(); + String serialized = new String(buffer.array(), 0, written); + + assertEquals("--aaa\r\n" + + "Content-Disposition: form-data; name=\"message\"; key1=\"value1\"; key2=\"value2\"\r\n" + + + "Content-Type: UTF-8; charset=UTF-8\r\n\r\n" + + "value1\r\n" + + "--aaa--\r\n", + serialized); + } + + @DisplayName("Test multi body message with 2 bodies and multiple headers") + @Test + void testMultiBodyWith2BodiesWithMoreHeaders() { + TelegramBodyPublisher bodyPublisher = new TelegramBodyPublisher(1024); + + final TelegramBodyPublisher.MultilineBodyPart<String> stringMultilineBodyPart1 + = new TelegramBodyPublisher.MultilineBodyPart<>("message1", "value1", StandardCharsets.UTF_8.name()); + + // Headers must be serialized in the insertion order + stringMultilineBodyPart1.addHeader("key1", "value1"); + stringMultilineBodyPart1.addHeader("key2", "value2"); + bodyPublisher.addBodyPart(stringMultilineBodyPart1); + + final TelegramBodyPublisher.MultilineBodyPart<String> stringMultilineBodyPart2 + = new TelegramBodyPublisher.MultilineBodyPart<>("message2", "value2", StandardCharsets.UTF_8.name()); + + // Headers must be serialized in the insertion order + stringMultilineBodyPart2.addHeader("key1", "value1"); + stringMultilineBodyPart2.addHeader("key2", "value2"); + bodyPublisher.addBodyPart(stringMultilineBodyPart2); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + Set<TelegramBodyPublisher.TelegramBodyPart> bodyParts = bodyPublisher.getBodyParts(); + + TelegramBodyPublisher.serialize(bodyParts, buffer, "aaa"); + + int written = buffer.capacity() - buffer.remaining(); + String serialized = new String(buffer.array(), 0, written); + + assertEquals("--aaa\r\n" + + "Content-Disposition: form-data; name=\"message1\"; key1=\"value1\"; key2=\"value2\"\r\n" + + + "Content-Type: UTF-8; charset=UTF-8\r\n\r\n" + + "value1\r\n" + + "--aaa\r\n" + + "Content-Disposition: form-data; name=\"message2\"; key1=\"value1\"; key2=\"value2\"\r\n" + + + "Content-Type: UTF-8; charset=UTF-8\r\n\r\n" + + "value2\r\n" + + "--aaa--\r\n", + serialized); + } +} diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/TelegramAsyncHandlerTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/TelegramAsyncHandlerTest.java new file mode 100644 index 00000000000..75aee9af209 --- /dev/null +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/service/TelegramAsyncHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.telegram.service; + +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayName("Tests charset information parsing") +class TelegramAsyncHandlerTest { + + @DisplayName("default charset information") + @Test + void testExtractCharset() { + assertEquals("UTF-8", TelegramAsyncHandler.extractCharset("Content-Type: text/plain; charset=UTF-8", + StandardCharsets.US_ASCII.name())); + } + + @DisplayName("null charset information") + @Test + void testExtractCharsetNull() { + assertEquals("UTF-8", TelegramAsyncHandler.extractCharset(null, StandardCharsets.UTF_8.name())); + } + + @DisplayName("with more items than expected") + @Test + void testExtractCharsetWithMoreItems() { + assertEquals("UTF-8", TelegramAsyncHandler.extractCharset("Content-Type: text/plain; charset=UTF-8; name=\"some-name\"", + StandardCharsets.US_ASCII.name())); + } + + @DisplayName("with the charset information in the middle") + @Test + void testExtractCharsetInTheMiddle() { + assertEquals("UTF-8", TelegramAsyncHandler.extractCharset("Content-Type: text/plain; name=\"some-name\"; charset=UTF-8", + StandardCharsets.US_ASCII.name())); + } + + @DisplayName("default charset information") + @Test + void testMissingCharset() { + assertEquals("ISO-8859-1", TelegramAsyncHandler.extractCharset("Content-Type: text/plain", + StandardCharsets.ISO_8859_1.name())); + } +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc index 3c66cf6f1f5..a39d0903206 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_19.adoc @@ -81,6 +81,14 @@ The deprecated options were removed and should be replaced by the following opti Parameters in form of `@name` are extracted from the body or message and their type is preserved and translated into corresponding `com.google.cloud.bigquery.StandardSQLTypeName`. See the https://cloud.google.com/java/docs/reference/google-cloud-bigquery/latest/com.google.cloud.bigquery.QueryParameterValue[documentation] for more information. (Conversion to StandardSQLTypeName.STRING was used for each type before) + +=== camel-telegram + +The component was migrated from the Async HTTP Client to the builtin HTTP client from Java 11 and newer. As such, +* the parameter `clientConfig`, that received an `AsyncHTTPClientConfiguration` instance was removed +* the parameter `client`, that received an `AsyncHttpClient` instance, was modified to receive a HTTPClient instance. + + === xtokenize language The xtokenize language has moved from `camel-xml-jaxp` to `camel-stax` JAR because diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/TelegramComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/TelegramComponentBuilderFactory.java index 49039f414d1..614d6390c34 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/TelegramComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/TelegramComponentBuilderFactory.java @@ -134,39 +134,20 @@ public interface TelegramComponentBuilderFactory { return this; } /** - * To use a custom AsyncHttpClient. + * To use a custom java.net.http.HttpClient. * - * The option is a: - * <code>org.asynchttpclient.AsyncHttpClient</code> type. + * The option is a: <code>java.net.http.HttpClient</code> + * type. * * Group: advanced * * @param client the value to set * @return the dsl builder */ - default TelegramComponentBuilder client( - org.asynchttpclient.AsyncHttpClient client) { + default TelegramComponentBuilder client(java.net.http.HttpClient client) { doSetProperty("client", client); return this; } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option is a: - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default TelegramComponentBuilder clientConfig( - org.asynchttpclient.AsyncHttpClientConfig clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } /** * The default Telegram authorization token to be used when the * information is not provided in the endpoints. @@ -204,8 +185,7 @@ public interface TelegramComponentBuilderFactory { case "lazyStartProducer": ((TelegramComponent) component).setLazyStartProducer((boolean) value); return true; case "autowiredEnabled": ((TelegramComponent) component).setAutowiredEnabled((boolean) value); return true; case "baseUri": ((TelegramComponent) component).setBaseUri((java.lang.String) value); return true; - case "client": ((TelegramComponent) component).setClient((org.asynchttpclient.AsyncHttpClient) value); return true; - case "clientConfig": ((TelegramComponent) component).setClientConfig((org.asynchttpclient.AsyncHttpClientConfig) value); return true; + case "client": ((TelegramComponent) component).setClient((java.net.http.HttpClient) value); return true; case "authorizationToken": ((TelegramComponent) component).setAuthorizationToken((java.lang.String) value); return true; default: return false; } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/TelegramEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/TelegramEndpointBuilderFactory.java index 488bbbada41..d55a104c2b6 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/TelegramEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/TelegramEndpointBuilderFactory.java @@ -895,76 +895,6 @@ public interface TelegramEndpointBuilderFactory { doSetProperty("baseUri", baseUri); return this; } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option is a: <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointConsumerBuilder bufferSize( - int bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointConsumerBuilder bufferSize( - String bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option is a: - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointConsumerBuilder clientConfig( - org.asynchttpclient.AsyncHttpClientConfig clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option will be converted to a - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointConsumerBuilder clientConfig( - String clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } } /** @@ -1162,76 +1092,6 @@ public interface TelegramEndpointBuilderFactory { doSetProperty("baseUri", baseUri); return this; } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option is a: <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointProducerBuilder bufferSize( - int bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointProducerBuilder bufferSize( - String bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option is a: - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointProducerBuilder clientConfig( - org.asynchttpclient.AsyncHttpClientConfig clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option will be converted to a - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointProducerBuilder clientConfig( - String clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } } /** @@ -1363,73 +1223,6 @@ public interface TelegramEndpointBuilderFactory { doSetProperty("baseUri", baseUri); return this; } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option is a: <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointBuilder bufferSize(int bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * The initial in-memory buffer size used when transferring data between - * Camel and AHC Client. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 4096 - * Group: advanced - * - * @param bufferSize the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointBuilder bufferSize(String bufferSize) { - doSetProperty("bufferSize", bufferSize); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option is a: - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointBuilder clientConfig( - org.asynchttpclient.AsyncHttpClientConfig clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } - /** - * To configure the AsyncHttpClient to use a custom - * com.ning.http.client.AsyncHttpClientConfig instance. - * - * The option will be converted to a - * <code>org.asynchttpclient.AsyncHttpClientConfig</code> - * type. - * - * Group: advanced - * - * @param clientConfig the value to set - * @return the dsl builder - */ - default AdvancedTelegramEndpointBuilder clientConfig(String clientConfig) { - doSetProperty("clientConfig", clientConfig); - return this; - } } public interface TelegramBuilders {
