This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch java-tcp-tls
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 6d8386304eb34c1b8ece8d26b2ace1d73cc7cd2a
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Mon Nov 17 12:39:20 2025 +0100

    feat(java): add TLS support to TCP connection
---
 core/certs/{iggy_cert.pem => iggy_ca_cert.pem}     |  0
 core/certs/iggy_cert.pem                           | 36 ++++----
 core/certs/iggy_key.pem                            | 52 ++++++------
 core/connectors/runtime/README.md                  |  2 +-
 core/connectors/runtime/config.toml                |  2 +-
 core/connectors/runtime/example_config/config.toml |  4 +
 .../iggy/client/async/tcp/AsyncIggyTcpClient.java  | 99 +++++++++++++++++-----
 .../iggy/client/async/tcp/AsyncTcpConnection.java  | 92 ++++++++++++++------
 .../iggy/client/blocking/tcp/IggyTcpClient.java    | 71 ++++++++++++++--
 .../client/blocking/tcp/InternalTcpClient.java     | 33 +++++++-
 10 files changed, 283 insertions(+), 108 deletions(-)

diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_ca_cert.pem
similarity index 100%
copy from core/certs/iggy_cert.pem
copy to core/certs/iggy_ca_cert.pem
diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_cert.pem
index 2ff51e122..db2ab1360 100644
--- a/core/certs/iggy_cert.pem
+++ b/core/certs/iggy_cert.pem
@@ -1,21 +1,21 @@
 -----BEGIN CERTIFICATE-----
-MIIDezCCAmOgAwIBAgIUFXqPQOAPI6HS7GBAK0JF2k5OIv0wDQYJKoZIhvcNAQEL
+MIIDaDCCAlCgAwIBAgIUIiNw0u8mef9MUpyav3fBW2WPDpcwDQYJKoZIhvcNAQEL
 BQAwPzELMAkGA1UEBhMCUEwxDTALBgNVBAoMBElnZ3kxDTALBgNVBAsMBElnZ3kx
-EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTA3MDkxMTM5MzdaFw0zNTA3MDcxMTM5
-MzdaMD8xCzAJBgNVBAYTAlBMMQ0wCwYDVQQKDARJZ2d5MQ0wCwYDVQQLDARJZ2d5
-MRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
-AoIBAQDD/vCGnd4anscfJ1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIq
-B2UlrPCE86mE/80Ai/qsfsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVj
-qFT2h+qM2ppLYIHgDAeNdcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu
-8zACxPERo5/sFym2DvHB/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTS
-ZNvVOrZJCzxlkppxvTOmPa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsoz
-XgBNozmHdikaCZBJevJhppPArakjAgMBAAGjbzBtMB0GA1UdDgQWBBRpAPK7Vrdo
-oPKKXBrhbT4IG69i8TAfBgNVHSMEGDAWgBRpAPK7VrdooPKKXBrhbT4IG69i8TAP
-BgNVHRMBAf8EBTADAQH/MBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATANBgkq
-hkiG9w0BAQsFAAOCAQEAkzhSsXK23WwBnm/x7lE3K1qGXMiF7DAGlLpaeCoIXuDj
-q28syAxuwPR0lJkcl0NQky53w/cPf3XnknKU2PBbUWCPS3SnTf9oAJKFL+AUv8UD
-4DYjulJJVYivvCrzQJYSNp6qQHE9FW06qCnOqClPBUrPgx4RYD9L139fu9nn64w4
-u05Jm86gXTdQfyPUf2MVegsozYcDzyOncnxt5PLkfpr44qTh3l3drVqabKUuz+yw
-WHl0yhntS4+vF2MHlleXzsVtKQSraTi6ZBEdIATo3jdSpncIg8rLVu6uM01PweJ8
-0NKgTLE3/Q83PMKQ8tErXYHqL4XzKlipSvSa/+/16g==
+EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTExMTcwOTIxMDhaFw0yNjExMTcwOTIx
+MDhaMBQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEP
+ADCCAQoCggEBANnLgvBKLRSXQWJMIO/BM0DSHo6SnCP+RAdsjTLJGSt+ylkJsl7W
+AVEtDXkoNxuyMa2hTqtUaom1sze0heRgcGkXlnnx6hNs23yyyQ/32SIh81RPH0RM
+xi7QGCI9VPLD+aRFiWDIRCit+QEbYfIkS7k2xkIipJsoqpBLJyFMM2z6+KEazJbU
+0Ajxxw81nbOSFSGSMD7dIVgnod+GypMBKhzlsevn5MTcEJ1VPWAnwSLsFUeZ1MrU
+BOi2brwrD7yY79YA7cgZvytTA39fuwawpk7A8NAAgdgqEsD2TOANYx2aOLp1nX7L
+sqYeRc8wiuC3+Y3bMUA+lvG6kGwE4mygarMCAwEAAaOBhjCBgzAJBgNVHRMEAjAA
+MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAUBgNVHREEDTALggls
+b2NhbGhvc3QwHQYDVR0OBBYEFPYlnpnKLvLJJ74clCV5h/HCq/uHMB8GA1UdIwQY
+MBaAFGkA8rtWt2ig8opcGuFtPggbr2LxMA0GCSqGSIb3DQEBCwUAA4IBAQAUM31s
+HbYJ7F4ywyPyScHTtNvk9n4dDhpakjpEqk7W0sMiNtwld083EMw8pa6HEfzouhIx
+SIZR57Ft48TYFUyB9rFYUFauXjTiJmJVGSSkMtQU7xSRrOQ73S6Y2GFKDnVxJA+k
+I6IJvWpMMylbPpDrF1ye/s4ar4kL0r7Xt/bEgIKyTIlmYLeG9UQH72bSn2R3GVnG
+/ta/AX+V//fYXGXFIX1WqwHSzgb7lkrRKgyezM1qUM9yYf/Do/zxdAxxnYukg0mG
+wP/239ntWuh3n2Pq20g3PvdhvRDd4bFqBTMFppnS5Pc4/Y/RV6xhCmWZaYrFWNQd
+LaQdtG8xF3PV+1g4
 -----END CERTIFICATE-----
diff --git a/core/certs/iggy_key.pem b/core/certs/iggy_key.pem
index 15df37f10..58826332b 100644
--- a/core/certs/iggy_key.pem
+++ b/core/certs/iggy_key.pem
@@ -1,28 +1,28 @@
 -----BEGIN PRIVATE KEY-----
-MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDD/vCGnd4anscf
-J1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIqB2UlrPCE86mE/80Ai/qs
-fsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVjqFT2h+qM2ppLYIHgDAeN
-dcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu8zACxPERo5/sFym2DvHB
-/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTSZNvVOrZJCzxlkppxvTOm
-Pa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsozXgBNozmHdikaCZBJevJh
-ppPArakjAgMBAAECggEACtymF8/H71G9V0ynBrhY6Yws4ARItfrNvnmTym0Npsl8
-KrsBzDmHB2lzZ0DhqiTbb4LRcj167vzS95xPVzPgKqO5rT3VxH9FDD1AlvVgX6UO
-86FPzB7ll2HNTaiWjNMbfJLg2iyPBgukn4JXN5+6CNJBgSrogILN29Bmfn+Y3L+i
-nUlSoUXk2M0+7cRiwPWAYv9PXf9DuG5q53cKdnLff0brik223FvcVGxUNI1N6Lae
-o1f3SwVP5eGlh8EcRQJn4l1WQfPt/NQ1BiT341aP91Or9WKYM9kSritZTZO55Thd
-FblK0FHYZ5J4EWh8ThDz0mGaktloIdE/KTx2djd8gQKBgQDEMK6fQJqNhS7kBD11
-L3nBZTJUNpmLM9WjvnwfcTP/S+t85XbnVE5ITCmVPKz/chqol0ZDMMO11K+W3APd
-+ZQ12O8buKHgT6tTlKoiOP0uwbnAdLjLZOWsQVddbhBErKPRu+G+5nyHVPTtdQG7
-qEYoe2xg/PNLr5/arNGLHUJixQKBgQD/vxfVHqvJBGMS4bx1p9txnKzmZqT4L79F
-tG83RotHSPUqO/hbVv5U1eI8oBnifptMqaI38Uzs2+Bilg1BF4PCo55W25aC0dg6
-r+KiFIq6mUX5WKH8han4IB34JvjV6jGvzuma/oTO8fxfSimuMWX0nkODAb2HeZ4f
-/VEbnw16xwKBgQCM2dKUfOI41jRK+YxR9Iq6QPf4I9bqbIVl8JzWSgSlthDT8z4B
-aLJnD18PSKd/IaFoBmsoU/s+MPr7GOwh0kwIuL05rr1w+GGtON9IgJesmOLN/D6r
-r72AhMy/RASj+ToHmpbA4mLnGiRZ0pYy7uWnrKyrmQ7m+KiQa2BpOtFtbQKBgBwV
-wys/VAQePDcNnSGajmm6l+4xZXpv2+RuvSvnzlHEvE+oCE0Xj5SHbHHV7yHFX46a
-rlrQX+8+8jRBYDE+wNR2HWRSdwPkwYcoW44LDXUScfHA/wD5OMIr3L6soPT05AH3
-igXgX3tObbWVMmCTwiuL2cQgQ80F2QyQSADthZBvAoGBAKRxpTDwBLZ9rhQUAUIy
-/d0htT6eNG3fqau2WcDfvfQ4ykdFieOCpZH9OYdCDqPDB0g/RZ6dUOKpjH3sTpdJ
-u5ZsQ9iqCvE2MHplqdmAn7H3sI1FKeh1TN68voa85Y+KsDurJQ4X3pNRmWinntQs
-+PYpwYdDwwwaxnUeTgPUEjph
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDZy4LwSi0Ul0Fi
+TCDvwTNA0h6Okpwj/kQHbI0yyRkrfspZCbJe1gFRLQ15KDcbsjGtoU6rVGqJtbM3
+tIXkYHBpF5Z58eoTbNt8sskP99kiIfNUTx9ETMYu0BgiPVTyw/mkRYlgyEQorfkB
+G2HyJEu5NsZCIqSbKKqQSychTDNs+vihGsyW1NAI8ccPNZ2zkhUhkjA+3SFYJ6Hf
+hsqTASoc5bHr5+TE3BCdVT1gJ8Ei7BVHmdTK1ATotm68Kw+8mO/WAO3IGb8rUwN/
+X7sGsKZOwPDQAIHYKhLA9kzgDWMdmji6dZ1+y7KmHkXPMIrgt/mN2zFAPpbxupBs
+BOJsoGqzAgMBAAECggEAB3SnjVlEtM0+wEIx6HM5MXFf+Wp8bOEB7C0jokbvFSWY
+gLbLw+JYljJQIUMmq8yMVdDNVCEmKoOtWG1WHgzNHFPh41fMNxEFbH3kUycdaEU4
+Qr0YqWplaGnYQafO1iauT5jPzJ/ecXQPL6ID1tm2O+drnwz3jJY2TI0+EC4/Hm5i
+yRBft++4WHz6wtQBzC63KUW4OqtpNVbLRt4bbOFsLLnkD6OmL/dxgRZiiMVpPZe6
+48lJZ+2HIAVnsii9sUeYJFvUbxbw/RwI5WmEq232YXk06yQxq2Ww5hTMF7CBxPUd
+9oF+juV0KaJ7tlbp1/dSjmB/YipVEOkgxEpLYifo0QKBgQDa9d0DQEywBnq7RbKM
+t0vdZOdrin+S2sIHsF6Or5wTqA3eDOSmHwQ3/wXkjLksKDv0jOn32IuOdJTAhgXH
+WTlilYhBGMK5KOjGQuKIBLeV+kmA/BhWsjXfJruNdBO9ZNcq0uADWZoWD/801Glx
+QLkcFGoAp+1mGBqVfmNAW12rowKBgQD+oy243wLfWBDHAJhj3OSseGMqcadXSlVi
+xEByxnxql/sN1ZMnaev9TOguEyrNAWx20kl8nYJAiEShMii5ESBJwz3gNq+BPVlL
+Kdeeuyd4WJ8Z+rDKfSXckIILiG8C85jmlKIW7ymeD0lq6p4AIr5kU5wB6A26F/Zl
+dPSu95w1sQKBgQC1lndPbfDrfsQkMV4tQwogtsERt8+rK88Eb2lL7imDR7kQcSLi
+/hASnGX3sBkVnNx0KLUUvbqnTtnafIuoUr+7mYVhbzZ2No4tdmTGJxtVvzdcSDWr
+GKqCwW2Dl0OTq2CifDYZsSKPr36YApzbtrCNsARlPN8t70mEt0d6pQse/wKBgCw4
+QDdrmv8YNwmoA87LzHZbDbWQOSRCh2N4e0yzRWXpcLKtVTmx/kZltahSscsaJTDZ
+QeO/IqB8SZbItCO3YqkMm9E1DfNeqQQRhx1MmQUgNrj3PP5dD1cnTT4delHD5did
+FSzwaYTGWNSVW8zgO7oGfwAhwUiA4swprFg6LclxAoGBALv07o4gyP9bYu/lVTYb
+ygQduofL+ZIiqm1bbnhaA8TtqR62YfjPdMVt39LxNvIsba12Kq1ouapXcRI8J99M
+iA7KXgjAkhtqBpOiBFeLFq9giVQkLjCMM2v5zfFfQ+K2Coij/QNG4WJiRwroHDw+
+ZHX4bn6EJY3cVDMDPXmOxMEj
 -----END PRIVATE KEY-----
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
index a70c62b9d..483047a4f 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -25,7 +25,7 @@ token = "" # Personal Access Token (PAT) can be used instead 
of username and pas
 
 [iggy.tls] # Optional TLS configuration for Iggy TCP connection
 enabled = false
-ca_file = "core/certs/iggy_cert.pem"
+ca_file = "core/certs/iggy_ca_cert.pem"
 domain = "" # Optional domain for TLS connection
 
 [state]
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index fef75d678..dadcc0f35 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -42,7 +42,7 @@ token = "" # Personal Access Token (PAT) can be used instead 
of username and pas
 
 [iggy.tls] # Optional TLS configuration for Iggy TCP connection
 enabled = false
-ca_file = "core/certs/iggy_cert.pem"
+ca_file = "core/certs/iggy_ca_cert.pem"
 domain = "" # Optional domain for TLS connection
 
 [state]
diff --git a/core/connectors/runtime/example_config/config.toml 
b/core/connectors/runtime/example_config/config.toml
index 431f475ad..6bcbc9b22 100644
--- a/core/connectors/runtime/example_config/config.toml
+++ b/core/connectors/runtime/example_config/config.toml
@@ -40,6 +40,10 @@ username = "iggy"
 password = "iggy"
 token = "" # Personal Access Token (PAT) can be used instead of username and 
password
 
+[iggy.tls]
+enabled = false
+ca_file = "core/certs/iggy_ca_cert.pem"
+
 [state]
 path = "local_state"
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 6cada2006..4c8bf92b9 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -19,12 +19,14 @@
 
 package org.apache.iggy.client.async.tcp;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iggy.client.async.ConsumerGroupsClient;
 import org.apache.iggy.client.async.MessagesClient;
 import org.apache.iggy.client.async.StreamsClient;
 import org.apache.iggy.client.async.TopicsClient;
 import org.apache.iggy.client.async.UsersClient;
 
+import java.io.File;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,8 @@ public class AsyncIggyTcpClient {
     private final Optional<Duration> requestTimeout;
     private final Optional<Integer> connectionPoolSize;
     private final Optional<RetryPolicy> retryPolicy;
+    private final boolean enableTls;
+    private final Optional<File> tlsCertificate;
     private AsyncTcpConnection connection;
     private MessagesClient messagesClient;
     private ConsumerGroupsClient consumerGroupsClient;
@@ -51,12 +55,13 @@ public class AsyncIggyTcpClient {
     private UsersClient usersClient;
 
     public AsyncIggyTcpClient(String host, int port) {
-        this(host, port, null, null, null, null, null, null);
+        this(host, port, null, null, null, null, null, null, false, 
Optional.empty());
     }
 
     private AsyncIggyTcpClient(String host, int port, String username, String 
password,
                                Duration connectionTimeout, Duration 
requestTimeout,
-                               Integer connectionPoolSize, RetryPolicy 
retryPolicy) {
+                               Integer connectionPoolSize, RetryPolicy 
retryPolicy,
+                               boolean enableTls, Optional<File> 
tlsCertificate) {
         this.host = host;
         this.port = port;
         this.username = Optional.ofNullable(username);
@@ -65,6 +70,8 @@ public class AsyncIggyTcpClient {
         this.requestTimeout = Optional.ofNullable(requestTimeout);
         this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
         this.retryPolicy = Optional.ofNullable(retryPolicy);
+        this.enableTls = enableTls;
+        this.tlsCertificate = tlsCertificate;
     }
 
     /**
@@ -80,23 +87,23 @@ public class AsyncIggyTcpClient {
      * Connects to the Iggy server asynchronously.
      */
     public CompletableFuture<Void> connect() {
-        connection = new AsyncTcpConnection(host, port);
+        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate);
         return connection.connect()
-            .thenRun(() -> {
-                messagesClient = new MessagesTcpClient(connection);
-                consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
-                streamsClient = new StreamsTcpClient(connection);
-                topicsClient = new TopicsTcpClient(connection);
-                usersClient = new UsersTcpClient(connection);
-            })
-            .thenCompose(v -> {
-                // Auto-login if credentials are provided
-                if (username.isPresent() && password.isPresent()) {
-                    return usersClient.loginAsync(username.get(), 
password.get())
-                            .thenApply(identity -> null);
-                }
-                return CompletableFuture.completedFuture(null);
-            });
+                .thenRun(() -> {
+                    messagesClient = new MessagesTcpClient(connection);
+                    consumerGroupsClient = new 
ConsumerGroupsTcpClient(connection);
+                    streamsClient = new StreamsTcpClient(connection);
+                    topicsClient = new TopicsTcpClient(connection);
+                    usersClient = new UsersTcpClient(connection);
+                })
+                .thenCompose(v -> {
+                    // Auto-login if credentials are provided
+                    if (username.isPresent() && password.isPresent()) {
+                        return usersClient.loginAsync(username.get(), 
password.get())
+                                .thenApply(identity -> null);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                });
     }
 
     /**
@@ -171,6 +178,8 @@ public class AsyncIggyTcpClient {
         private Duration requestTimeout;
         private Integer connectionPoolSize;
         private RetryPolicy retryPolicy;
+        private boolean enableTls = false;
+        private File tlsTrustedCertificatePem;
 
         private Builder() {
         }
@@ -254,6 +263,49 @@ public class AsyncIggyTcpClient {
             return this;
         }
 
+        /**
+         * Enables or disables TLS for the TCP connection.
+         *
+         * @param enableTls whether to enable TLS
+         * @return this builder
+         */
+        public Builder tls(boolean enableTls) {
+            this.enableTls = enableTls;
+            return this;
+        }
+
+        /**
+         * Enables TLS for the TCP connection.
+         *
+         * @return this builder
+         */
+        public Builder enableTls() {
+            this.enableTls = true;
+            return this;
+        }
+
+        /**
+         * Sets a custom trusted certificate (PEM file) to validate the server 
certificate.
+         *
+         * @param certificate the PEM file containing the certificate or CA 
chain
+         * @return this builder
+         */
+        public Builder tlsTrustedCertificate(File certificate) {
+            this.tlsTrustedCertificatePem = certificate;
+            return this;
+        }
+
+        /**
+         * Sets a custom trusted certificate (PEM file path) to validate the 
server certificate.
+         *
+         * @param certificatePath the PEM file path containing the certificate 
or CA chain
+         * @return this builder
+         */
+        public Builder tlsTrustedCertificate(String certificatePath) {
+            this.tlsTrustedCertificatePem = 
StringUtils.isBlank(certificatePath) ? null : new File(certificatePath);
+            return this;
+        }
+
         /**
          * Builds and returns a configured AsyncIggyTcpClient instance.
          * Note: You still need to call connect() on the returned client.
@@ -268,7 +320,8 @@ public class AsyncIggyTcpClient {
                 throw new IllegalArgumentException("Port must be a positive 
integer");
             }
             return new AsyncIggyTcpClient(host, port, username, password,
-                    connectionTimeout, requestTimeout, connectionPoolSize, 
retryPolicy);
+                    connectionTimeout, requestTimeout, connectionPoolSize, 
retryPolicy,
+                    enableTls, Optional.ofNullable(tlsTrustedCertificatePem));
         }
     }
 
@@ -300,10 +353,10 @@ public class AsyncIggyTcpClient {
         /**
          * Creates a retry policy with exponential backoff and custom 
parameters.
          *
-         * @param maxRetries the maximum number of retries
+         * @param maxRetries   the maximum number of retries
          * @param initialDelay the initial delay before the first retry
-         * @param maxDelay the maximum delay between retries
-         * @param multiplier the multiplier for exponential backoff
+         * @param maxDelay     the maximum delay between retries
+         * @param multiplier   the multiplier for exponential backoff
          * @return a RetryPolicy with custom exponential backoff configuration
          */
         public static RetryPolicy exponentialBackoff(int maxRetries, Duration 
initialDelay, Duration maxDelay, double multiplier) {
@@ -314,7 +367,7 @@ public class AsyncIggyTcpClient {
          * Creates a retry policy with fixed delay.
          *
          * @param maxRetries the maximum number of retries
-         * @param delay the fixed delay between retries
+         * @param delay      the fixed delay between retries
          * @return a RetryPolicy with fixed delay configuration
          */
         public static RetryPolicy fixedDelay(int maxRetries, Duration delay) {
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index c1f5d14a3..faa1c8ba2 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -21,13 +21,25 @@ package org.apache.iggy.client.async.tcp;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLException;
+import java.io.File;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +53,9 @@ public class AsyncTcpConnection {
 
     private final String host;
     private final int port;
+    private final boolean enableTls;
+    private final Optional<File> tlsCertificate;
+    private final SslContext sslContext;
     private final EventLoopGroup eventLoopGroup;
     private final Bootstrap bootstrap;
     private Channel channel;
@@ -48,35 +63,56 @@ public class AsyncTcpConnection {
     private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> 
pendingRequests = new ConcurrentHashMap<>();
 
     public AsyncTcpConnection(String host, int port) {
+        this(host, port, false, Optional.empty());
+    }
+
+    public AsyncTcpConnection(String host, int port, boolean enableTls, 
Optional<File> tlsCertificate) {
         this.host = host;
         this.port = port;
+        this.enableTls = enableTls;
+        this.tlsCertificate = tlsCertificate;
         this.eventLoopGroup = new NioEventLoopGroup();
         this.bootstrap = new Bootstrap();
+        if (this.enableTls) {
+            try {
+                SslContextBuilder builder = SslContextBuilder.forClient();
+                this.tlsCertificate.ifPresent(builder::trustManager);
+                this.sslContext = builder.build();
+            } catch (SSLException e) {
+                throw new RuntimeException("Failed to build SSL context for 
AsyncTcpConnection", e);
+            }
+        } else {
+            this.sslContext = null;
+        }
         configureBootstrap();
     }
 
     private void configureBootstrap() {
         bootstrap.group(eventLoopGroup)
-            .channel(NioSocketChannel.class)
-            .option(ChannelOption.TCP_NODELAY, true)
-            .option(ChannelOption.SO_KEEPALIVE, true)
-            .handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                protected void initChannel(SocketChannel ch) {
-                    ChannelPipeline pipeline = ch.pipeline();
-
-                    // Custom frame decoder for Iggy protocol responses
-                    pipeline.addLast("frameDecoder", new IggyFrameDecoder());
-
-                    // No encoder needed - we build complete frames following 
Iggy protocol
-                    // The protocol already includes the length field, so 
adding an encoder
-                    // would duplicate it. This matches the blocking client 
implementation.
-
-                    // Response handler
-                    pipeline.addLast("responseHandler",
-                        new IggyResponseHandler(pendingRequests));
-                }
-            });
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.SO_KEEPALIVE, true)
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) {
+                        ChannelPipeline pipeline = ch.pipeline();
+
+                        if (enableTls) {
+                            pipeline.addLast("ssl", 
sslContext.newHandler(ch.alloc(), host, port));
+                        }
+
+                        // Custom frame decoder for Iggy protocol responses
+                        pipeline.addLast("frameDecoder", new 
IggyFrameDecoder());
+
+                        // No encoder needed - we build complete frames 
following Iggy protocol
+                        // The protocol already includes the length field, so 
adding an encoder
+                        // would duplicate it. This matches the blocking 
client implementation.
+
+                        // Response handler
+                        pipeline.addLast("responseHandler",
+                                new IggyResponseHandler(pendingRequests));
+                    }
+                });
     }
 
     /**
@@ -103,7 +139,7 @@ public class AsyncTcpConnection {
     public CompletableFuture<ByteBuf> sendAsync(int commandCode, ByteBuf 
payload) {
         if (channel == null || !channel.isActive()) {
             return CompletableFuture.failedFuture(
-                new IllegalStateException("Connection not established or 
closed"));
+                    new IllegalStateException("Connection not established or 
closed"));
         }
 
         // Since Iggy doesn't use request IDs, we'll just use a simple queue
@@ -132,7 +168,7 @@ public class AsyncTcpConnection {
                 hex.append(String.format("%02x ", b));
             }
             logger.trace("Sending frame with command: {}, payload size: {}, 
frame payload size (with command): {}, total frame size: {}",
-                commandCode, payloadSize, framePayloadSize, 
frame.readableBytes());
+                    commandCode, payloadSize, framePayloadSize, 
frame.readableBytes());
             logger.trace("Frame bytes (hex): {}", hex.toString());
         }
 
@@ -195,8 +231,8 @@ public class AsyncTcpConnection {
             // Get the oldest pending request
             if (!pendingRequests.isEmpty()) {
                 Long oldestRequestId = pendingRequests.keySet().stream()
-                    .min(Long::compare)
-                    .orElse(null);
+                        .min(Long::compare)
+                        .orElse(null);
 
                 if (oldestRequestId != null) {
                     CompletableFuture<ByteBuf> future = 
pendingRequests.remove(oldestRequestId);
@@ -210,10 +246,10 @@ public class AsyncTcpConnection {
                             byte[] errorBytes = new byte[length];
                             msg.readBytes(errorBytes);
                             future.completeExceptionally(
-                                new RuntimeException("Server error: " + new 
String(errorBytes)));
+                                    new RuntimeException("Server error: " + 
new String(errorBytes)));
                         } else {
                             future.completeExceptionally(
-                                new RuntimeException("Server error with 
status: " + status));
+                                    new RuntimeException("Server error with 
status: " + status));
                         }
                     }
                 }
@@ -224,7 +260,7 @@ public class AsyncTcpConnection {
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
             // Fail all pending requests
             pendingRequests.values().forEach(future ->
-                future.completeExceptionally(cause));
+                    future.completeExceptionally(cause));
             pendingRequests.clear();
             ctx.close();
         }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
index 3e389057e..ed39ad945 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.iggy.client.blocking.tcp;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iggy.client.blocking.ConsumerGroupsClient;
 import org.apache.iggy.client.blocking.ConsumerOffsetsClient;
 import org.apache.iggy.client.blocking.IggyBaseClient;
@@ -29,6 +30,8 @@ import org.apache.iggy.client.blocking.StreamsClient;
 import org.apache.iggy.client.blocking.SystemClient;
 import org.apache.iggy.client.blocking.TopicsClient;
 import org.apache.iggy.client.blocking.UsersClient;
+
+import java.io.File;
 import java.time.Duration;
 import java.util.Optional;
 
@@ -51,14 +54,17 @@ public class IggyTcpClient implements IggyBaseClient {
     private final Optional<Duration> requestTimeout;
     private final Optional<Integer> connectionPoolSize;
     private final Optional<RetryPolicy> retryPolicy;
+    private final boolean enableTls;
+    private final Optional<File> tlsCertificate;
 
     public IggyTcpClient(String host, Integer port) {
-        this(host, port, null, null, null, null, null, null);
+        this(host, port, null, null, null, null, null, null, false, 
Optional.empty());
     }
 
     private IggyTcpClient(String host, Integer port, String username, String 
password,
                           Duration connectionTimeout, Duration requestTimeout,
-                          Integer connectionPoolSize, RetryPolicy retryPolicy) 
{
+                          Integer connectionPoolSize, RetryPolicy retryPolicy,
+                          boolean enableTls, Optional<File> tlsCertificate) {
         this.host = host;
         this.port = port;
         this.username = Optional.ofNullable(username);
@@ -67,8 +73,10 @@ public class IggyTcpClient implements IggyBaseClient {
         this.requestTimeout = Optional.ofNullable(requestTimeout);
         this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
         this.retryPolicy = Optional.ofNullable(retryPolicy);
+        this.enableTls = enableTls;
+        this.tlsCertificate = tlsCertificate;
 
-        InternalTcpClient tcpClient = new InternalTcpClient(host, port);
+        InternalTcpClient tcpClient = new InternalTcpClient(host, port, 
enableTls, tlsCertificate);
         tcpClient.connect();
         usersClient = new UsersTcpClient(tcpClient);
         streamsClient = new StreamsTcpClient(tcpClient);
@@ -152,6 +160,8 @@ public class IggyTcpClient implements IggyBaseClient {
         private Duration requestTimeout;
         private Integer connectionPoolSize;
         private RetryPolicy retryPolicy;
+        private boolean enableTls = false;
+        private File tlsCertificate;
 
         private Builder() {
         }
@@ -235,6 +245,49 @@ public class IggyTcpClient implements IggyBaseClient {
             return this;
         }
 
+        /**
+         * Enables or disables TLS for the TCP connection.
+         *
+         * @param enableTls whether to enable TLS
+         * @return this builder
+         */
+        public Builder tls(boolean enableTls) {
+            this.enableTls = enableTls;
+            return this;
+        }
+
+        /**
+         * Enables TLS for the TCP connection.
+         *
+         * @return this builder
+         */
+        public Builder enableTls() {
+            this.enableTls = true;
+            return this;
+        }
+
+        /**
+         * Sets a custom trusted certificate (PEM file) to validate the server 
certificate.
+         *
+         * @param tlsCertificate the PEM file containing the certificate or CA 
chain
+         * @return this builder
+         */
+        public Builder tlsCertificate(File tlsCertificate) {
+            this.tlsCertificate = tlsCertificate;
+            return this;
+        }
+
+        /**
+         * Sets a custom trusted certificate (PEM file path) to validate the 
server certificate.
+         *
+         * @param tlsCertificatePath the PEM file path containing the 
certificate or CA chain
+         * @return this builder
+         */
+        public Builder tlsCertificate(String tlsCertificatePath) {
+            this.tlsCertificate = StringUtils.isBlank(tlsCertificatePath) ? 
null : new File(tlsCertificatePath);
+            return this;
+        }
+
         /**
          * Builds and returns a configured IggyTcpClient instance.
          *
@@ -247,8 +300,10 @@ public class IggyTcpClient implements IggyBaseClient {
             if (port == null || port <= 0) {
                 throw new IllegalArgumentException("Port must be a positive 
integer");
             }
+            boolean enableTls = this.enableTls;
             return new IggyTcpClient(host, port, username, password,
-                    connectionTimeout, requestTimeout, connectionPoolSize, 
retryPolicy);
+                    connectionTimeout, requestTimeout, connectionPoolSize, 
retryPolicy,
+                    enableTls, Optional.ofNullable(tlsCertificate));
         }
     }
 
@@ -280,10 +335,10 @@ public class IggyTcpClient implements IggyBaseClient {
         /**
          * Creates a retry policy with exponential backoff and custom 
parameters.
          *
-         * @param maxRetries the maximum number of retries
+         * @param maxRetries   the maximum number of retries
          * @param initialDelay the initial delay before the first retry
-         * @param maxDelay the maximum delay between retries
-         * @param multiplier the multiplier for exponential backoff
+         * @param maxDelay     the maximum delay between retries
+         * @param multiplier   the multiplier for exponential backoff
          * @return a RetryPolicy with custom exponential backoff configuration
          */
         public static RetryPolicy exponentialBackoff(int maxRetries, Duration 
initialDelay, Duration maxDelay, double multiplier) {
@@ -294,7 +349,7 @@ public class IggyTcpClient implements IggyBaseClient {
          * Creates a retry policy with fixed delay.
          *
          * @param maxRetries the maximum number of retries
-         * @param delay the fixed delay between retries
+         * @param delay      the fixed delay between retries
          * @return a RetryPolicy with fixed delay configuration
          */
         public static RetryPolicy fixedDelay(int maxRetries, Duration delay) {
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
index 59b89c3b9..d01190047 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
@@ -23,10 +23,16 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
 import reactor.core.publisher.Mono;
 import reactor.netty.Connection;
 import reactor.netty.tcp.TcpClient;
+
+import javax.net.ssl.SSLException;
+import java.io.File;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -41,10 +47,27 @@ final class InternalTcpClient {
     private Connection connection;
 
     InternalTcpClient(String host, Integer port) {
-        client = TcpClient.create()
+        this(host, port, false, Optional.empty());
+    }
+
+    InternalTcpClient(String host, Integer port, boolean enableTls, 
Optional<File> tlsCertificate) {
+        TcpClient tcpClient = TcpClient.create()
                 .host(host)
                 .port(port)
                 .doOnConnected(conn -> conn.addHandlerLast(new 
IggyResponseDecoder()));
+
+        if (enableTls) {
+            try {
+                SslContextBuilder builder = SslContextBuilder.forClient();
+                tlsCertificate.ifPresent(builder::trustManager);
+                SslContext sslContext = builder.build();
+                tcpClient = tcpClient.secure(sslSpec -> 
sslSpec.sslContext(sslContext));
+            } catch (SSLException e) {
+                throw new RuntimeException("Failed to configure TLS for 
TcpClient", e);
+            }
+        }
+
+        client = tcpClient;
     }
 
     void connect() {
@@ -56,7 +79,9 @@ final class InternalTcpClient {
         return send(code.getValue());
     }
 
-    /** Use {@link #send(CommandCode)} instead. */
+    /**
+     * Use {@link #send(CommandCode)} instead.
+     */
     @Deprecated
     ByteBuf send(int command) {
         return send(command, Unpooled.EMPTY_BUFFER);
@@ -66,7 +91,9 @@ final class InternalTcpClient {
         return send(code.getValue(), payload);
     }
 
-    /** Use {@link #send(CommandCode, ByteBuf)} instead. */
+    /**
+     * Use {@link #send(CommandCode, ByteBuf)} instead.
+     */
     @Deprecated
     ByteBuf send(int command, ByteBuf payload) {
         var payloadSize = payload.readableBytes() + COMMAND_LENGTH;


Reply via email to