This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch java-tcp-tls in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6d8386304eb34c1b8ece8d26b2ace1d73cc7cd2a Author: Maciej Modzelewski <[email protected]> AuthorDate: Mon Nov 17 12:39:20 2025 +0100 feat(java): add TLS support to TCP connection --- core/certs/{iggy_cert.pem => iggy_ca_cert.pem} | 0 core/certs/iggy_cert.pem | 36 ++++---- core/certs/iggy_key.pem | 52 ++++++------ core/connectors/runtime/README.md | 2 +- core/connectors/runtime/config.toml | 2 +- core/connectors/runtime/example_config/config.toml | 4 + .../iggy/client/async/tcp/AsyncIggyTcpClient.java | 99 +++++++++++++++++----- .../iggy/client/async/tcp/AsyncTcpConnection.java | 92 ++++++++++++++------ .../iggy/client/blocking/tcp/IggyTcpClient.java | 71 ++++++++++++++-- .../client/blocking/tcp/InternalTcpClient.java | 33 +++++++- 10 files changed, 283 insertions(+), 108 deletions(-) diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_ca_cert.pem similarity index 100% copy from core/certs/iggy_cert.pem copy to core/certs/iggy_ca_cert.pem diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_cert.pem index 2ff51e122..db2ab1360 100644 --- a/core/certs/iggy_cert.pem +++ b/core/certs/iggy_cert.pem @@ -1,21 +1,21 @@ -----BEGIN CERTIFICATE----- -MIIDezCCAmOgAwIBAgIUFXqPQOAPI6HS7GBAK0JF2k5OIv0wDQYJKoZIhvcNAQEL +MIIDaDCCAlCgAwIBAgIUIiNw0u8mef9MUpyav3fBW2WPDpcwDQYJKoZIhvcNAQEL BQAwPzELMAkGA1UEBhMCUEwxDTALBgNVBAoMBElnZ3kxDTALBgNVBAsMBElnZ3kx -EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTA3MDkxMTM5MzdaFw0zNTA3MDcxMTM5 -MzdaMD8xCzAJBgNVBAYTAlBMMQ0wCwYDVQQKDARJZ2d5MQ0wCwYDVQQLDARJZ2d5 -MRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK -AoIBAQDD/vCGnd4anscfJ1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIq -B2UlrPCE86mE/80Ai/qsfsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVj -qFT2h+qM2ppLYIHgDAeNdcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu -8zACxPERo5/sFym2DvHB/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTS -ZNvVOrZJCzxlkppxvTOmPa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsoz -XgBNozmHdikaCZBJevJhppPArakjAgMBAAGjbzBtMB0GA1UdDgQWBBRpAPK7Vrdo -oPKKXBrhbT4IG69i8TAfBgNVHSMEGDAWgBRpAPK7VrdooPKKXBrhbT4IG69i8TAP -BgNVHRMBAf8EBTADAQH/MBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATANBgkq -hkiG9w0BAQsFAAOCAQEAkzhSsXK23WwBnm/x7lE3K1qGXMiF7DAGlLpaeCoIXuDj -q28syAxuwPR0lJkcl0NQky53w/cPf3XnknKU2PBbUWCPS3SnTf9oAJKFL+AUv8UD -4DYjulJJVYivvCrzQJYSNp6qQHE9FW06qCnOqClPBUrPgx4RYD9L139fu9nn64w4 -u05Jm86gXTdQfyPUf2MVegsozYcDzyOncnxt5PLkfpr44qTh3l3drVqabKUuz+yw -WHl0yhntS4+vF2MHlleXzsVtKQSraTi6ZBEdIATo3jdSpncIg8rLVu6uM01PweJ8 -0NKgTLE3/Q83PMKQ8tErXYHqL4XzKlipSvSa/+/16g== +EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTExMTcwOTIxMDhaFw0yNjExMTcwOTIx +MDhaMBQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEP +ADCCAQoCggEBANnLgvBKLRSXQWJMIO/BM0DSHo6SnCP+RAdsjTLJGSt+ylkJsl7W +AVEtDXkoNxuyMa2hTqtUaom1sze0heRgcGkXlnnx6hNs23yyyQ/32SIh81RPH0RM +xi7QGCI9VPLD+aRFiWDIRCit+QEbYfIkS7k2xkIipJsoqpBLJyFMM2z6+KEazJbU +0Ajxxw81nbOSFSGSMD7dIVgnod+GypMBKhzlsevn5MTcEJ1VPWAnwSLsFUeZ1MrU +BOi2brwrD7yY79YA7cgZvytTA39fuwawpk7A8NAAgdgqEsD2TOANYx2aOLp1nX7L +sqYeRc8wiuC3+Y3bMUA+lvG6kGwE4mygarMCAwEAAaOBhjCBgzAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAUBgNVHREEDTALggls +b2NhbGhvc3QwHQYDVR0OBBYEFPYlnpnKLvLJJ74clCV5h/HCq/uHMB8GA1UdIwQY +MBaAFGkA8rtWt2ig8opcGuFtPggbr2LxMA0GCSqGSIb3DQEBCwUAA4IBAQAUM31s +HbYJ7F4ywyPyScHTtNvk9n4dDhpakjpEqk7W0sMiNtwld083EMw8pa6HEfzouhIx +SIZR57Ft48TYFUyB9rFYUFauXjTiJmJVGSSkMtQU7xSRrOQ73S6Y2GFKDnVxJA+k +I6IJvWpMMylbPpDrF1ye/s4ar4kL0r7Xt/bEgIKyTIlmYLeG9UQH72bSn2R3GVnG +/ta/AX+V//fYXGXFIX1WqwHSzgb7lkrRKgyezM1qUM9yYf/Do/zxdAxxnYukg0mG +wP/239ntWuh3n2Pq20g3PvdhvRDd4bFqBTMFppnS5Pc4/Y/RV6xhCmWZaYrFWNQd +LaQdtG8xF3PV+1g4 -----END CERTIFICATE----- diff --git a/core/certs/iggy_key.pem b/core/certs/iggy_key.pem index 15df37f10..58826332b 100644 --- a/core/certs/iggy_key.pem +++ b/core/certs/iggy_key.pem @@ -1,28 +1,28 @@ -----BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDD/vCGnd4anscf -J1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIqB2UlrPCE86mE/80Ai/qs -fsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVjqFT2h+qM2ppLYIHgDAeN -dcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu8zACxPERo5/sFym2DvHB -/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTSZNvVOrZJCzxlkppxvTOm -Pa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsozXgBNozmHdikaCZBJevJh -ppPArakjAgMBAAECggEACtymF8/H71G9V0ynBrhY6Yws4ARItfrNvnmTym0Npsl8 -KrsBzDmHB2lzZ0DhqiTbb4LRcj167vzS95xPVzPgKqO5rT3VxH9FDD1AlvVgX6UO -86FPzB7ll2HNTaiWjNMbfJLg2iyPBgukn4JXN5+6CNJBgSrogILN29Bmfn+Y3L+i -nUlSoUXk2M0+7cRiwPWAYv9PXf9DuG5q53cKdnLff0brik223FvcVGxUNI1N6Lae -o1f3SwVP5eGlh8EcRQJn4l1WQfPt/NQ1BiT341aP91Or9WKYM9kSritZTZO55Thd -FblK0FHYZ5J4EWh8ThDz0mGaktloIdE/KTx2djd8gQKBgQDEMK6fQJqNhS7kBD11 -L3nBZTJUNpmLM9WjvnwfcTP/S+t85XbnVE5ITCmVPKz/chqol0ZDMMO11K+W3APd -+ZQ12O8buKHgT6tTlKoiOP0uwbnAdLjLZOWsQVddbhBErKPRu+G+5nyHVPTtdQG7 -qEYoe2xg/PNLr5/arNGLHUJixQKBgQD/vxfVHqvJBGMS4bx1p9txnKzmZqT4L79F -tG83RotHSPUqO/hbVv5U1eI8oBnifptMqaI38Uzs2+Bilg1BF4PCo55W25aC0dg6 -r+KiFIq6mUX5WKH8han4IB34JvjV6jGvzuma/oTO8fxfSimuMWX0nkODAb2HeZ4f -/VEbnw16xwKBgQCM2dKUfOI41jRK+YxR9Iq6QPf4I9bqbIVl8JzWSgSlthDT8z4B -aLJnD18PSKd/IaFoBmsoU/s+MPr7GOwh0kwIuL05rr1w+GGtON9IgJesmOLN/D6r -r72AhMy/RASj+ToHmpbA4mLnGiRZ0pYy7uWnrKyrmQ7m+KiQa2BpOtFtbQKBgBwV -wys/VAQePDcNnSGajmm6l+4xZXpv2+RuvSvnzlHEvE+oCE0Xj5SHbHHV7yHFX46a -rlrQX+8+8jRBYDE+wNR2HWRSdwPkwYcoW44LDXUScfHA/wD5OMIr3L6soPT05AH3 -igXgX3tObbWVMmCTwiuL2cQgQ80F2QyQSADthZBvAoGBAKRxpTDwBLZ9rhQUAUIy -/d0htT6eNG3fqau2WcDfvfQ4ykdFieOCpZH9OYdCDqPDB0g/RZ6dUOKpjH3sTpdJ -u5ZsQ9iqCvE2MHplqdmAn7H3sI1FKeh1TN68voa85Y+KsDurJQ4X3pNRmWinntQs -+PYpwYdDwwwaxnUeTgPUEjph +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDZy4LwSi0Ul0Fi +TCDvwTNA0h6Okpwj/kQHbI0yyRkrfspZCbJe1gFRLQ15KDcbsjGtoU6rVGqJtbM3 +tIXkYHBpF5Z58eoTbNt8sskP99kiIfNUTx9ETMYu0BgiPVTyw/mkRYlgyEQorfkB +G2HyJEu5NsZCIqSbKKqQSychTDNs+vihGsyW1NAI8ccPNZ2zkhUhkjA+3SFYJ6Hf +hsqTASoc5bHr5+TE3BCdVT1gJ8Ei7BVHmdTK1ATotm68Kw+8mO/WAO3IGb8rUwN/ +X7sGsKZOwPDQAIHYKhLA9kzgDWMdmji6dZ1+y7KmHkXPMIrgt/mN2zFAPpbxupBs +BOJsoGqzAgMBAAECggEAB3SnjVlEtM0+wEIx6HM5MXFf+Wp8bOEB7C0jokbvFSWY +gLbLw+JYljJQIUMmq8yMVdDNVCEmKoOtWG1WHgzNHFPh41fMNxEFbH3kUycdaEU4 +Qr0YqWplaGnYQafO1iauT5jPzJ/ecXQPL6ID1tm2O+drnwz3jJY2TI0+EC4/Hm5i +yRBft++4WHz6wtQBzC63KUW4OqtpNVbLRt4bbOFsLLnkD6OmL/dxgRZiiMVpPZe6 +48lJZ+2HIAVnsii9sUeYJFvUbxbw/RwI5WmEq232YXk06yQxq2Ww5hTMF7CBxPUd +9oF+juV0KaJ7tlbp1/dSjmB/YipVEOkgxEpLYifo0QKBgQDa9d0DQEywBnq7RbKM +t0vdZOdrin+S2sIHsF6Or5wTqA3eDOSmHwQ3/wXkjLksKDv0jOn32IuOdJTAhgXH +WTlilYhBGMK5KOjGQuKIBLeV+kmA/BhWsjXfJruNdBO9ZNcq0uADWZoWD/801Glx +QLkcFGoAp+1mGBqVfmNAW12rowKBgQD+oy243wLfWBDHAJhj3OSseGMqcadXSlVi +xEByxnxql/sN1ZMnaev9TOguEyrNAWx20kl8nYJAiEShMii5ESBJwz3gNq+BPVlL +Kdeeuyd4WJ8Z+rDKfSXckIILiG8C85jmlKIW7ymeD0lq6p4AIr5kU5wB6A26F/Zl +dPSu95w1sQKBgQC1lndPbfDrfsQkMV4tQwogtsERt8+rK88Eb2lL7imDR7kQcSLi +/hASnGX3sBkVnNx0KLUUvbqnTtnafIuoUr+7mYVhbzZ2No4tdmTGJxtVvzdcSDWr +GKqCwW2Dl0OTq2CifDYZsSKPr36YApzbtrCNsARlPN8t70mEt0d6pQse/wKBgCw4 +QDdrmv8YNwmoA87LzHZbDbWQOSRCh2N4e0yzRWXpcLKtVTmx/kZltahSscsaJTDZ +QeO/IqB8SZbItCO3YqkMm9E1DfNeqQQRhx1MmQUgNrj3PP5dD1cnTT4delHD5did +FSzwaYTGWNSVW8zgO7oGfwAhwUiA4swprFg6LclxAoGBALv07o4gyP9bYu/lVTYb +ygQduofL+ZIiqm1bbnhaA8TtqR62YfjPdMVt39LxNvIsba12Kq1ouapXcRI8J99M +iA7KXgjAkhtqBpOiBFeLFq9giVQkLjCMM2v5zfFfQ+K2Coij/QNG4WJiRwroHDw+ +ZHX4bn6EJY3cVDMDPXmOxMEj -----END PRIVATE KEY----- diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md index a70c62b9d..483047a4f 100644 --- a/core/connectors/runtime/README.md +++ b/core/connectors/runtime/README.md @@ -25,7 +25,7 @@ token = "" # Personal Access Token (PAT) can be used instead of username and pas [iggy.tls] # Optional TLS configuration for Iggy TCP connection enabled = false -ca_file = "core/certs/iggy_cert.pem" +ca_file = "core/certs/iggy_ca_cert.pem" domain = "" # Optional domain for TLS connection [state] diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index fef75d678..dadcc0f35 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -42,7 +42,7 @@ token = "" # Personal Access Token (PAT) can be used instead of username and pas [iggy.tls] # Optional TLS configuration for Iggy TCP connection enabled = false -ca_file = "core/certs/iggy_cert.pem" +ca_file = "core/certs/iggy_ca_cert.pem" domain = "" # Optional domain for TLS connection [state] diff --git a/core/connectors/runtime/example_config/config.toml b/core/connectors/runtime/example_config/config.toml index 431f475ad..6bcbc9b22 100644 --- a/core/connectors/runtime/example_config/config.toml +++ b/core/connectors/runtime/example_config/config.toml @@ -40,6 +40,10 @@ username = "iggy" password = "iggy" token = "" # Personal Access Token (PAT) can be used instead of username and password +[iggy.tls] +enabled = false +ca_file = "core/certs/iggy_ca_cert.pem" + [state] path = "local_state" diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java index 6cada2006..4c8bf92b9 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java @@ -19,12 +19,14 @@ package org.apache.iggy.client.async.tcp; +import org.apache.commons.lang3.StringUtils; import org.apache.iggy.client.async.ConsumerGroupsClient; import org.apache.iggy.client.async.MessagesClient; import org.apache.iggy.client.async.StreamsClient; import org.apache.iggy.client.async.TopicsClient; import org.apache.iggy.client.async.UsersClient; +import java.io.File; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -43,6 +45,8 @@ public class AsyncIggyTcpClient { private final Optional<Duration> requestTimeout; private final Optional<Integer> connectionPoolSize; private final Optional<RetryPolicy> retryPolicy; + private final boolean enableTls; + private final Optional<File> tlsCertificate; private AsyncTcpConnection connection; private MessagesClient messagesClient; private ConsumerGroupsClient consumerGroupsClient; @@ -51,12 +55,13 @@ public class AsyncIggyTcpClient { private UsersClient usersClient; public AsyncIggyTcpClient(String host, int port) { - this(host, port, null, null, null, null, null, null); + this(host, port, null, null, null, null, null, null, false, Optional.empty()); } private AsyncIggyTcpClient(String host, int port, String username, String password, Duration connectionTimeout, Duration requestTimeout, - Integer connectionPoolSize, RetryPolicy retryPolicy) { + Integer connectionPoolSize, RetryPolicy retryPolicy, + boolean enableTls, Optional<File> tlsCertificate) { this.host = host; this.port = port; this.username = Optional.ofNullable(username); @@ -65,6 +70,8 @@ public class AsyncIggyTcpClient { this.requestTimeout = Optional.ofNullable(requestTimeout); this.connectionPoolSize = Optional.ofNullable(connectionPoolSize); this.retryPolicy = Optional.ofNullable(retryPolicy); + this.enableTls = enableTls; + this.tlsCertificate = tlsCertificate; } /** @@ -80,23 +87,23 @@ public class AsyncIggyTcpClient { * Connects to the Iggy server asynchronously. */ public CompletableFuture<Void> connect() { - connection = new AsyncTcpConnection(host, port); + connection = new AsyncTcpConnection(host, port, enableTls, tlsCertificate); return connection.connect() - .thenRun(() -> { - messagesClient = new MessagesTcpClient(connection); - consumerGroupsClient = new ConsumerGroupsTcpClient(connection); - streamsClient = new StreamsTcpClient(connection); - topicsClient = new TopicsTcpClient(connection); - usersClient = new UsersTcpClient(connection); - }) - .thenCompose(v -> { - // Auto-login if credentials are provided - if (username.isPresent() && password.isPresent()) { - return usersClient.loginAsync(username.get(), password.get()) - .thenApply(identity -> null); - } - return CompletableFuture.completedFuture(null); - }); + .thenRun(() -> { + messagesClient = new MessagesTcpClient(connection); + consumerGroupsClient = new ConsumerGroupsTcpClient(connection); + streamsClient = new StreamsTcpClient(connection); + topicsClient = new TopicsTcpClient(connection); + usersClient = new UsersTcpClient(connection); + }) + .thenCompose(v -> { + // Auto-login if credentials are provided + if (username.isPresent() && password.isPresent()) { + return usersClient.loginAsync(username.get(), password.get()) + .thenApply(identity -> null); + } + return CompletableFuture.completedFuture(null); + }); } /** @@ -171,6 +178,8 @@ public class AsyncIggyTcpClient { private Duration requestTimeout; private Integer connectionPoolSize; private RetryPolicy retryPolicy; + private boolean enableTls = false; + private File tlsTrustedCertificatePem; private Builder() { } @@ -254,6 +263,49 @@ public class AsyncIggyTcpClient { return this; } + /** + * Enables or disables TLS for the TCP connection. + * + * @param enableTls whether to enable TLS + * @return this builder + */ + public Builder tls(boolean enableTls) { + this.enableTls = enableTls; + return this; + } + + /** + * Enables TLS for the TCP connection. + * + * @return this builder + */ + public Builder enableTls() { + this.enableTls = true; + return this; + } + + /** + * Sets a custom trusted certificate (PEM file) to validate the server certificate. + * + * @param certificate the PEM file containing the certificate or CA chain + * @return this builder + */ + public Builder tlsTrustedCertificate(File certificate) { + this.tlsTrustedCertificatePem = certificate; + return this; + } + + /** + * Sets a custom trusted certificate (PEM file path) to validate the server certificate. + * + * @param certificatePath the PEM file path containing the certificate or CA chain + * @return this builder + */ + public Builder tlsTrustedCertificate(String certificatePath) { + this.tlsTrustedCertificatePem = StringUtils.isBlank(certificatePath) ? null : new File(certificatePath); + return this; + } + /** * Builds and returns a configured AsyncIggyTcpClient instance. * Note: You still need to call connect() on the returned client. @@ -268,7 +320,8 @@ public class AsyncIggyTcpClient { throw new IllegalArgumentException("Port must be a positive integer"); } return new AsyncIggyTcpClient(host, port, username, password, - connectionTimeout, requestTimeout, connectionPoolSize, retryPolicy); + connectionTimeout, requestTimeout, connectionPoolSize, retryPolicy, + enableTls, Optional.ofNullable(tlsTrustedCertificatePem)); } } @@ -300,10 +353,10 @@ public class AsyncIggyTcpClient { /** * Creates a retry policy with exponential backoff and custom parameters. * - * @param maxRetries the maximum number of retries + * @param maxRetries the maximum number of retries * @param initialDelay the initial delay before the first retry - * @param maxDelay the maximum delay between retries - * @param multiplier the multiplier for exponential backoff + * @param maxDelay the maximum delay between retries + * @param multiplier the multiplier for exponential backoff * @return a RetryPolicy with custom exponential backoff configuration */ public static RetryPolicy exponentialBackoff(int maxRetries, Duration initialDelay, Duration maxDelay, double multiplier) { @@ -314,7 +367,7 @@ public class AsyncIggyTcpClient { * Creates a retry policy with fixed delay. * * @param maxRetries the maximum number of retries - * @param delay the fixed delay between retries + * @param delay the fixed delay between retries * @return a RetryPolicy with fixed delay configuration */ public static RetryPolicy fixedDelay(int maxRetries, Duration delay) { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java index c1f5d14a3..faa1c8ba2 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java @@ -21,13 +21,25 @@ package org.apache.iggy.client.async.tcp; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLException; +import java.io.File; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -41,6 +53,9 @@ public class AsyncTcpConnection { private final String host; private final int port; + private final boolean enableTls; + private final Optional<File> tlsCertificate; + private final SslContext sslContext; private final EventLoopGroup eventLoopGroup; private final Bootstrap bootstrap; private Channel channel; @@ -48,35 +63,56 @@ public class AsyncTcpConnection { private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> pendingRequests = new ConcurrentHashMap<>(); public AsyncTcpConnection(String host, int port) { + this(host, port, false, Optional.empty()); + } + + public AsyncTcpConnection(String host, int port, boolean enableTls, Optional<File> tlsCertificate) { this.host = host; this.port = port; + this.enableTls = enableTls; + this.tlsCertificate = tlsCertificate; this.eventLoopGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap(); + if (this.enableTls) { + try { + SslContextBuilder builder = SslContextBuilder.forClient(); + this.tlsCertificate.ifPresent(builder::trustManager); + this.sslContext = builder.build(); + } catch (SSLException e) { + throw new RuntimeException("Failed to build SSL context for AsyncTcpConnection", e); + } + } else { + this.sslContext = null; + } configureBootstrap(); } private void configureBootstrap() { bootstrap.group(eventLoopGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - - // Custom frame decoder for Iggy protocol responses - pipeline.addLast("frameDecoder", new IggyFrameDecoder()); - - // No encoder needed - we build complete frames following Iggy protocol - // The protocol already includes the length field, so adding an encoder - // would duplicate it. This matches the blocking client implementation. - - // Response handler - pipeline.addLast("responseHandler", - new IggyResponseHandler(pendingRequests)); - } - }); + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + if (enableTls) { + pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port)); + } + + // Custom frame decoder for Iggy protocol responses + pipeline.addLast("frameDecoder", new IggyFrameDecoder()); + + // No encoder needed - we build complete frames following Iggy protocol + // The protocol already includes the length field, so adding an encoder + // would duplicate it. This matches the blocking client implementation. + + // Response handler + pipeline.addLast("responseHandler", + new IggyResponseHandler(pendingRequests)); + } + }); } /** @@ -103,7 +139,7 @@ public class AsyncTcpConnection { public CompletableFuture<ByteBuf> sendAsync(int commandCode, ByteBuf payload) { if (channel == null || !channel.isActive()) { return CompletableFuture.failedFuture( - new IllegalStateException("Connection not established or closed")); + new IllegalStateException("Connection not established or closed")); } // Since Iggy doesn't use request IDs, we'll just use a simple queue @@ -132,7 +168,7 @@ public class AsyncTcpConnection { hex.append(String.format("%02x ", b)); } logger.trace("Sending frame with command: {}, payload size: {}, frame payload size (with command): {}, total frame size: {}", - commandCode, payloadSize, framePayloadSize, frame.readableBytes()); + commandCode, payloadSize, framePayloadSize, frame.readableBytes()); logger.trace("Frame bytes (hex): {}", hex.toString()); } @@ -195,8 +231,8 @@ public class AsyncTcpConnection { // Get the oldest pending request if (!pendingRequests.isEmpty()) { Long oldestRequestId = pendingRequests.keySet().stream() - .min(Long::compare) - .orElse(null); + .min(Long::compare) + .orElse(null); if (oldestRequestId != null) { CompletableFuture<ByteBuf> future = pendingRequests.remove(oldestRequestId); @@ -210,10 +246,10 @@ public class AsyncTcpConnection { byte[] errorBytes = new byte[length]; msg.readBytes(errorBytes); future.completeExceptionally( - new RuntimeException("Server error: " + new String(errorBytes))); + new RuntimeException("Server error: " + new String(errorBytes))); } else { future.completeExceptionally( - new RuntimeException("Server error with status: " + status)); + new RuntimeException("Server error with status: " + status)); } } } @@ -224,7 +260,7 @@ public class AsyncTcpConnection { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Fail all pending requests pendingRequests.values().forEach(future -> - future.completeExceptionally(cause)); + future.completeExceptionally(cause)); pendingRequests.clear(); ctx.close(); } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java index 3e389057e..ed39ad945 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java @@ -19,6 +19,7 @@ package org.apache.iggy.client.blocking.tcp; +import org.apache.commons.lang3.StringUtils; import org.apache.iggy.client.blocking.ConsumerGroupsClient; import org.apache.iggy.client.blocking.ConsumerOffsetsClient; import org.apache.iggy.client.blocking.IggyBaseClient; @@ -29,6 +30,8 @@ import org.apache.iggy.client.blocking.StreamsClient; import org.apache.iggy.client.blocking.SystemClient; import org.apache.iggy.client.blocking.TopicsClient; import org.apache.iggy.client.blocking.UsersClient; + +import java.io.File; import java.time.Duration; import java.util.Optional; @@ -51,14 +54,17 @@ public class IggyTcpClient implements IggyBaseClient { private final Optional<Duration> requestTimeout; private final Optional<Integer> connectionPoolSize; private final Optional<RetryPolicy> retryPolicy; + private final boolean enableTls; + private final Optional<File> tlsCertificate; public IggyTcpClient(String host, Integer port) { - this(host, port, null, null, null, null, null, null); + this(host, port, null, null, null, null, null, null, false, Optional.empty()); } private IggyTcpClient(String host, Integer port, String username, String password, Duration connectionTimeout, Duration requestTimeout, - Integer connectionPoolSize, RetryPolicy retryPolicy) { + Integer connectionPoolSize, RetryPolicy retryPolicy, + boolean enableTls, Optional<File> tlsCertificate) { this.host = host; this.port = port; this.username = Optional.ofNullable(username); @@ -67,8 +73,10 @@ public class IggyTcpClient implements IggyBaseClient { this.requestTimeout = Optional.ofNullable(requestTimeout); this.connectionPoolSize = Optional.ofNullable(connectionPoolSize); this.retryPolicy = Optional.ofNullable(retryPolicy); + this.enableTls = enableTls; + this.tlsCertificate = tlsCertificate; - InternalTcpClient tcpClient = new InternalTcpClient(host, port); + InternalTcpClient tcpClient = new InternalTcpClient(host, port, enableTls, tlsCertificate); tcpClient.connect(); usersClient = new UsersTcpClient(tcpClient); streamsClient = new StreamsTcpClient(tcpClient); @@ -152,6 +160,8 @@ public class IggyTcpClient implements IggyBaseClient { private Duration requestTimeout; private Integer connectionPoolSize; private RetryPolicy retryPolicy; + private boolean enableTls = false; + private File tlsCertificate; private Builder() { } @@ -235,6 +245,49 @@ public class IggyTcpClient implements IggyBaseClient { return this; } + /** + * Enables or disables TLS for the TCP connection. + * + * @param enableTls whether to enable TLS + * @return this builder + */ + public Builder tls(boolean enableTls) { + this.enableTls = enableTls; + return this; + } + + /** + * Enables TLS for the TCP connection. + * + * @return this builder + */ + public Builder enableTls() { + this.enableTls = true; + return this; + } + + /** + * Sets a custom trusted certificate (PEM file) to validate the server certificate. + * + * @param tlsCertificate the PEM file containing the certificate or CA chain + * @return this builder + */ + public Builder tlsCertificate(File tlsCertificate) { + this.tlsCertificate = tlsCertificate; + return this; + } + + /** + * Sets a custom trusted certificate (PEM file path) to validate the server certificate. + * + * @param tlsCertificatePath the PEM file path containing the certificate or CA chain + * @return this builder + */ + public Builder tlsCertificate(String tlsCertificatePath) { + this.tlsCertificate = StringUtils.isBlank(tlsCertificatePath) ? null : new File(tlsCertificatePath); + return this; + } + /** * Builds and returns a configured IggyTcpClient instance. * @@ -247,8 +300,10 @@ public class IggyTcpClient implements IggyBaseClient { if (port == null || port <= 0) { throw new IllegalArgumentException("Port must be a positive integer"); } + boolean enableTls = this.enableTls; return new IggyTcpClient(host, port, username, password, - connectionTimeout, requestTimeout, connectionPoolSize, retryPolicy); + connectionTimeout, requestTimeout, connectionPoolSize, retryPolicy, + enableTls, Optional.ofNullable(tlsCertificate)); } } @@ -280,10 +335,10 @@ public class IggyTcpClient implements IggyBaseClient { /** * Creates a retry policy with exponential backoff and custom parameters. * - * @param maxRetries the maximum number of retries + * @param maxRetries the maximum number of retries * @param initialDelay the initial delay before the first retry - * @param maxDelay the maximum delay between retries - * @param multiplier the multiplier for exponential backoff + * @param maxDelay the maximum delay between retries + * @param multiplier the multiplier for exponential backoff * @return a RetryPolicy with custom exponential backoff configuration */ public static RetryPolicy exponentialBackoff(int maxRetries, Duration initialDelay, Duration maxDelay, double multiplier) { @@ -294,7 +349,7 @@ public class IggyTcpClient implements IggyBaseClient { * Creates a retry policy with fixed delay. * * @param maxRetries the maximum number of retries - * @param delay the fixed delay between retries + * @param delay the fixed delay between retries * @return a RetryPolicy with fixed delay configuration */ public static RetryPolicy fixedDelay(int maxRetries, Duration delay) { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java index 59b89c3b9..d01190047 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java @@ -23,10 +23,16 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.tcp.TcpClient; + +import javax.net.ssl.SSLException; +import java.io.File; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -41,10 +47,27 @@ final class InternalTcpClient { private Connection connection; InternalTcpClient(String host, Integer port) { - client = TcpClient.create() + this(host, port, false, Optional.empty()); + } + + InternalTcpClient(String host, Integer port, boolean enableTls, Optional<File> tlsCertificate) { + TcpClient tcpClient = TcpClient.create() .host(host) .port(port) .doOnConnected(conn -> conn.addHandlerLast(new IggyResponseDecoder())); + + if (enableTls) { + try { + SslContextBuilder builder = SslContextBuilder.forClient(); + tlsCertificate.ifPresent(builder::trustManager); + SslContext sslContext = builder.build(); + tcpClient = tcpClient.secure(sslSpec -> sslSpec.sslContext(sslContext)); + } catch (SSLException e) { + throw new RuntimeException("Failed to configure TLS for TcpClient", e); + } + } + + client = tcpClient; } void connect() { @@ -56,7 +79,9 @@ final class InternalTcpClient { return send(code.getValue()); } - /** Use {@link #send(CommandCode)} instead. */ + /** + * Use {@link #send(CommandCode)} instead. + */ @Deprecated ByteBuf send(int command) { return send(command, Unpooled.EMPTY_BUFFER); @@ -66,7 +91,9 @@ final class InternalTcpClient { return send(code.getValue(), payload); } - /** Use {@link #send(CommandCode, ByteBuf)} instead. */ + /** + * Use {@link #send(CommandCode, ByteBuf)} instead. + */ @Deprecated ByteBuf send(int command, ByteBuf payload) { var payloadSize = payload.readableBytes() + COMMAND_LENGTH;
