This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 40d6715a1 feat(java): add TLS support to TCP connection (#2358)
40d6715a1 is described below
commit 40d6715a115276c215a7f51b2efe178fb123eaf0
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Mon Nov 17 13:25:07 2025 +0100
feat(java): add TLS support to TCP connection (#2358)
---
core/certs/{iggy_cert.pem => iggy_ca_cert.pem} | 0
core/certs/iggy_cert.pem | 36 ++++----
core/certs/iggy_key.pem | 52 ++++++------
core/connectors/runtime/README.md | 2 +-
core/connectors/runtime/config.toml | 2 +-
core/connectors/runtime/example_config/config.toml | 4 +
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 99 +++++++++++++++++-----
.../iggy/client/async/tcp/AsyncTcpConnection.java | 92 ++++++++++++++------
.../iggy/client/blocking/tcp/IggyTcpClient.java | 71 ++++++++++++++--
.../client/blocking/tcp/InternalTcpClient.java | 33 +++++++-
10 files changed, 283 insertions(+), 108 deletions(-)
diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_ca_cert.pem
similarity index 100%
copy from core/certs/iggy_cert.pem
copy to core/certs/iggy_ca_cert.pem
diff --git a/core/certs/iggy_cert.pem b/core/certs/iggy_cert.pem
index 2ff51e122..db2ab1360 100644
--- a/core/certs/iggy_cert.pem
+++ b/core/certs/iggy_cert.pem
@@ -1,21 +1,21 @@
-----BEGIN CERTIFICATE-----
-MIIDezCCAmOgAwIBAgIUFXqPQOAPI6HS7GBAK0JF2k5OIv0wDQYJKoZIhvcNAQEL
+MIIDaDCCAlCgAwIBAgIUIiNw0u8mef9MUpyav3fBW2WPDpcwDQYJKoZIhvcNAQEL
BQAwPzELMAkGA1UEBhMCUEwxDTALBgNVBAoMBElnZ3kxDTALBgNVBAsMBElnZ3kx
-EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTA3MDkxMTM5MzdaFw0zNTA3MDcxMTM5
-MzdaMD8xCzAJBgNVBAYTAlBMMQ0wCwYDVQQKDARJZ2d5MQ0wCwYDVQQLDARJZ2d5
-MRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
-AoIBAQDD/vCGnd4anscfJ1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIq
-B2UlrPCE86mE/80Ai/qsfsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVj
-qFT2h+qM2ppLYIHgDAeNdcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu
-8zACxPERo5/sFym2DvHB/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTS
-ZNvVOrZJCzxlkppxvTOmPa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsoz
-XgBNozmHdikaCZBJevJhppPArakjAgMBAAGjbzBtMB0GA1UdDgQWBBRpAPK7Vrdo
-oPKKXBrhbT4IG69i8TAfBgNVHSMEGDAWgBRpAPK7VrdooPKKXBrhbT4IG69i8TAP
-BgNVHRMBAf8EBTADAQH/MBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATANBgkq
-hkiG9w0BAQsFAAOCAQEAkzhSsXK23WwBnm/x7lE3K1qGXMiF7DAGlLpaeCoIXuDj
-q28syAxuwPR0lJkcl0NQky53w/cPf3XnknKU2PBbUWCPS3SnTf9oAJKFL+AUv8UD
-4DYjulJJVYivvCrzQJYSNp6qQHE9FW06qCnOqClPBUrPgx4RYD9L139fu9nn64w4
-u05Jm86gXTdQfyPUf2MVegsozYcDzyOncnxt5PLkfpr44qTh3l3drVqabKUuz+yw
-WHl0yhntS4+vF2MHlleXzsVtKQSraTi6ZBEdIATo3jdSpncIg8rLVu6uM01PweJ8
-0NKgTLE3/Q83PMKQ8tErXYHqL4XzKlipSvSa/+/16g==
+EjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yNTExMTcwOTIxMDhaFw0yNjExMTcwOTIx
+MDhaMBQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEP
+ADCCAQoCggEBANnLgvBKLRSXQWJMIO/BM0DSHo6SnCP+RAdsjTLJGSt+ylkJsl7W
+AVEtDXkoNxuyMa2hTqtUaom1sze0heRgcGkXlnnx6hNs23yyyQ/32SIh81RPH0RM
+xi7QGCI9VPLD+aRFiWDIRCit+QEbYfIkS7k2xkIipJsoqpBLJyFMM2z6+KEazJbU
+0Ajxxw81nbOSFSGSMD7dIVgnod+GypMBKhzlsevn5MTcEJ1VPWAnwSLsFUeZ1MrU
+BOi2brwrD7yY79YA7cgZvytTA39fuwawpk7A8NAAgdgqEsD2TOANYx2aOLp1nX7L
+sqYeRc8wiuC3+Y3bMUA+lvG6kGwE4mygarMCAwEAAaOBhjCBgzAJBgNVHRMEAjAA
+MAsGA1UdDwQEAwIFoDATBgNVHSUEDDAKBggrBgEFBQcDATAUBgNVHREEDTALggls
+b2NhbGhvc3QwHQYDVR0OBBYEFPYlnpnKLvLJJ74clCV5h/HCq/uHMB8GA1UdIwQY
+MBaAFGkA8rtWt2ig8opcGuFtPggbr2LxMA0GCSqGSIb3DQEBCwUAA4IBAQAUM31s
+HbYJ7F4ywyPyScHTtNvk9n4dDhpakjpEqk7W0sMiNtwld083EMw8pa6HEfzouhIx
+SIZR57Ft48TYFUyB9rFYUFauXjTiJmJVGSSkMtQU7xSRrOQ73S6Y2GFKDnVxJA+k
+I6IJvWpMMylbPpDrF1ye/s4ar4kL0r7Xt/bEgIKyTIlmYLeG9UQH72bSn2R3GVnG
+/ta/AX+V//fYXGXFIX1WqwHSzgb7lkrRKgyezM1qUM9yYf/Do/zxdAxxnYukg0mG
+wP/239ntWuh3n2Pq20g3PvdhvRDd4bFqBTMFppnS5Pc4/Y/RV6xhCmWZaYrFWNQd
+LaQdtG8xF3PV+1g4
-----END CERTIFICATE-----
diff --git a/core/certs/iggy_key.pem b/core/certs/iggy_key.pem
index 15df37f10..58826332b 100644
--- a/core/certs/iggy_key.pem
+++ b/core/certs/iggy_key.pem
@@ -1,28 +1,28 @@
-----BEGIN PRIVATE KEY-----
-MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDD/vCGnd4anscf
-J1p1PgAiMPIy1KH9/gHIIN1SYq8SjAyz0YR09POpMmIqB2UlrPCE86mE/80Ai/qs
-fsxe9T3r+Ymnb9wThuAbOmtu8Lpu5xcq34lnqUieaBVjqFT2h+qM2ppLYIHgDAeN
-dcKtCSOQF0hq4jtyszKmyOccn4gzWtAf+3P9wtJ/kWGu8zACxPERo5/sFym2DvHB
-/8YwOp6ypsWqi4lMwoM29L7AsIA/+QN9JQBrITGTWYTSZNvVOrZJCzxlkppxvTOm
-Pa92BVM3Ks/eKz7I0YMO/lGzRod77VnvnSayBVXQKsozXgBNozmHdikaCZBJevJh
-ppPArakjAgMBAAECggEACtymF8/H71G9V0ynBrhY6Yws4ARItfrNvnmTym0Npsl8
-KrsBzDmHB2lzZ0DhqiTbb4LRcj167vzS95xPVzPgKqO5rT3VxH9FDD1AlvVgX6UO
-86FPzB7ll2HNTaiWjNMbfJLg2iyPBgukn4JXN5+6CNJBgSrogILN29Bmfn+Y3L+i
-nUlSoUXk2M0+7cRiwPWAYv9PXf9DuG5q53cKdnLff0brik223FvcVGxUNI1N6Lae
-o1f3SwVP5eGlh8EcRQJn4l1WQfPt/NQ1BiT341aP91Or9WKYM9kSritZTZO55Thd
-FblK0FHYZ5J4EWh8ThDz0mGaktloIdE/KTx2djd8gQKBgQDEMK6fQJqNhS7kBD11
-L3nBZTJUNpmLM9WjvnwfcTP/S+t85XbnVE5ITCmVPKz/chqol0ZDMMO11K+W3APd
-+ZQ12O8buKHgT6tTlKoiOP0uwbnAdLjLZOWsQVddbhBErKPRu+G+5nyHVPTtdQG7
-qEYoe2xg/PNLr5/arNGLHUJixQKBgQD/vxfVHqvJBGMS4bx1p9txnKzmZqT4L79F
-tG83RotHSPUqO/hbVv5U1eI8oBnifptMqaI38Uzs2+Bilg1BF4PCo55W25aC0dg6
-r+KiFIq6mUX5WKH8han4IB34JvjV6jGvzuma/oTO8fxfSimuMWX0nkODAb2HeZ4f
-/VEbnw16xwKBgQCM2dKUfOI41jRK+YxR9Iq6QPf4I9bqbIVl8JzWSgSlthDT8z4B
-aLJnD18PSKd/IaFoBmsoU/s+MPr7GOwh0kwIuL05rr1w+GGtON9IgJesmOLN/D6r
-r72AhMy/RASj+ToHmpbA4mLnGiRZ0pYy7uWnrKyrmQ7m+KiQa2BpOtFtbQKBgBwV
-wys/VAQePDcNnSGajmm6l+4xZXpv2+RuvSvnzlHEvE+oCE0Xj5SHbHHV7yHFX46a
-rlrQX+8+8jRBYDE+wNR2HWRSdwPkwYcoW44LDXUScfHA/wD5OMIr3L6soPT05AH3
-igXgX3tObbWVMmCTwiuL2cQgQ80F2QyQSADthZBvAoGBAKRxpTDwBLZ9rhQUAUIy
-/d0htT6eNG3fqau2WcDfvfQ4ykdFieOCpZH9OYdCDqPDB0g/RZ6dUOKpjH3sTpdJ
-u5ZsQ9iqCvE2MHplqdmAn7H3sI1FKeh1TN68voa85Y+KsDurJQ4X3pNRmWinntQs
-+PYpwYdDwwwaxnUeTgPUEjph
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDZy4LwSi0Ul0Fi
+TCDvwTNA0h6Okpwj/kQHbI0yyRkrfspZCbJe1gFRLQ15KDcbsjGtoU6rVGqJtbM3
+tIXkYHBpF5Z58eoTbNt8sskP99kiIfNUTx9ETMYu0BgiPVTyw/mkRYlgyEQorfkB
+G2HyJEu5NsZCIqSbKKqQSychTDNs+vihGsyW1NAI8ccPNZ2zkhUhkjA+3SFYJ6Hf
+hsqTASoc5bHr5+TE3BCdVT1gJ8Ei7BVHmdTK1ATotm68Kw+8mO/WAO3IGb8rUwN/
+X7sGsKZOwPDQAIHYKhLA9kzgDWMdmji6dZ1+y7KmHkXPMIrgt/mN2zFAPpbxupBs
+BOJsoGqzAgMBAAECggEAB3SnjVlEtM0+wEIx6HM5MXFf+Wp8bOEB7C0jokbvFSWY
+gLbLw+JYljJQIUMmq8yMVdDNVCEmKoOtWG1WHgzNHFPh41fMNxEFbH3kUycdaEU4
+Qr0YqWplaGnYQafO1iauT5jPzJ/ecXQPL6ID1tm2O+drnwz3jJY2TI0+EC4/Hm5i
+yRBft++4WHz6wtQBzC63KUW4OqtpNVbLRt4bbOFsLLnkD6OmL/dxgRZiiMVpPZe6
+48lJZ+2HIAVnsii9sUeYJFvUbxbw/RwI5WmEq232YXk06yQxq2Ww5hTMF7CBxPUd
+9oF+juV0KaJ7tlbp1/dSjmB/YipVEOkgxEpLYifo0QKBgQDa9d0DQEywBnq7RbKM
+t0vdZOdrin+S2sIHsF6Or5wTqA3eDOSmHwQ3/wXkjLksKDv0jOn32IuOdJTAhgXH
+WTlilYhBGMK5KOjGQuKIBLeV+kmA/BhWsjXfJruNdBO9ZNcq0uADWZoWD/801Glx
+QLkcFGoAp+1mGBqVfmNAW12rowKBgQD+oy243wLfWBDHAJhj3OSseGMqcadXSlVi
+xEByxnxql/sN1ZMnaev9TOguEyrNAWx20kl8nYJAiEShMii5ESBJwz3gNq+BPVlL
+Kdeeuyd4WJ8Z+rDKfSXckIILiG8C85jmlKIW7ymeD0lq6p4AIr5kU5wB6A26F/Zl
+dPSu95w1sQKBgQC1lndPbfDrfsQkMV4tQwogtsERt8+rK88Eb2lL7imDR7kQcSLi
+/hASnGX3sBkVnNx0KLUUvbqnTtnafIuoUr+7mYVhbzZ2No4tdmTGJxtVvzdcSDWr
+GKqCwW2Dl0OTq2CifDYZsSKPr36YApzbtrCNsARlPN8t70mEt0d6pQse/wKBgCw4
+QDdrmv8YNwmoA87LzHZbDbWQOSRCh2N4e0yzRWXpcLKtVTmx/kZltahSscsaJTDZ
+QeO/IqB8SZbItCO3YqkMm9E1DfNeqQQRhx1MmQUgNrj3PP5dD1cnTT4delHD5did
+FSzwaYTGWNSVW8zgO7oGfwAhwUiA4swprFg6LclxAoGBALv07o4gyP9bYu/lVTYb
+ygQduofL+ZIiqm1bbnhaA8TtqR62YfjPdMVt39LxNvIsba12Kq1ouapXcRI8J99M
+iA7KXgjAkhtqBpOiBFeLFq9giVQkLjCMM2v5zfFfQ+K2Coij/QNG4WJiRwroHDw+
+ZHX4bn6EJY3cVDMDPXmOxMEj
-----END PRIVATE KEY-----
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index a70c62b9d..483047a4f 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -25,7 +25,7 @@ token = "" # Personal Access Token (PAT) can be used instead
of username and pas
[iggy.tls] # Optional TLS configuration for Iggy TCP connection
enabled = false
-ca_file = "core/certs/iggy_cert.pem"
+ca_file = "core/certs/iggy_ca_cert.pem"
domain = "" # Optional domain for TLS connection
[state]
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index fef75d678..dadcc0f35 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -42,7 +42,7 @@ token = "" # Personal Access Token (PAT) can be used instead
of username and pas
[iggy.tls] # Optional TLS configuration for Iggy TCP connection
enabled = false
-ca_file = "core/certs/iggy_cert.pem"
+ca_file = "core/certs/iggy_ca_cert.pem"
domain = "" # Optional domain for TLS connection
[state]
diff --git a/core/connectors/runtime/example_config/config.toml
b/core/connectors/runtime/example_config/config.toml
index 431f475ad..6bcbc9b22 100644
--- a/core/connectors/runtime/example_config/config.toml
+++ b/core/connectors/runtime/example_config/config.toml
@@ -40,6 +40,10 @@ username = "iggy"
password = "iggy"
token = "" # Personal Access Token (PAT) can be used instead of username and
password
+[iggy.tls]
+enabled = false
+ca_file = "core/certs/iggy_ca_cert.pem"
+
[state]
path = "local_state"
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 6cada2006..4c8bf92b9 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -19,12 +19,14 @@
package org.apache.iggy.client.async.tcp;
+import org.apache.commons.lang3.StringUtils;
import org.apache.iggy.client.async.ConsumerGroupsClient;
import org.apache.iggy.client.async.MessagesClient;
import org.apache.iggy.client.async.StreamsClient;
import org.apache.iggy.client.async.TopicsClient;
import org.apache.iggy.client.async.UsersClient;
+import java.io.File;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,8 @@ public class AsyncIggyTcpClient {
private final Optional<Duration> requestTimeout;
private final Optional<Integer> connectionPoolSize;
private final Optional<RetryPolicy> retryPolicy;
+ private final boolean enableTls;
+ private final Optional<File> tlsCertificate;
private AsyncTcpConnection connection;
private MessagesClient messagesClient;
private ConsumerGroupsClient consumerGroupsClient;
@@ -51,12 +55,13 @@ public class AsyncIggyTcpClient {
private UsersClient usersClient;
public AsyncIggyTcpClient(String host, int port) {
- this(host, port, null, null, null, null, null, null);
+ this(host, port, null, null, null, null, null, null, false,
Optional.empty());
}
private AsyncIggyTcpClient(String host, int port, String username, String
password,
Duration connectionTimeout, Duration
requestTimeout,
- Integer connectionPoolSize, RetryPolicy
retryPolicy) {
+ Integer connectionPoolSize, RetryPolicy
retryPolicy,
+ boolean enableTls, Optional<File>
tlsCertificate) {
this.host = host;
this.port = port;
this.username = Optional.ofNullable(username);
@@ -65,6 +70,8 @@ public class AsyncIggyTcpClient {
this.requestTimeout = Optional.ofNullable(requestTimeout);
this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
this.retryPolicy = Optional.ofNullable(retryPolicy);
+ this.enableTls = enableTls;
+ this.tlsCertificate = tlsCertificate;
}
/**
@@ -80,23 +87,23 @@ public class AsyncIggyTcpClient {
* Connects to the Iggy server asynchronously.
*/
public CompletableFuture<Void> connect() {
- connection = new AsyncTcpConnection(host, port);
+ connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate);
return connection.connect()
- .thenRun(() -> {
- messagesClient = new MessagesTcpClient(connection);
- consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
- streamsClient = new StreamsTcpClient(connection);
- topicsClient = new TopicsTcpClient(connection);
- usersClient = new UsersTcpClient(connection);
- })
- .thenCompose(v -> {
- // Auto-login if credentials are provided
- if (username.isPresent() && password.isPresent()) {
- return usersClient.loginAsync(username.get(),
password.get())
- .thenApply(identity -> null);
- }
- return CompletableFuture.completedFuture(null);
- });
+ .thenRun(() -> {
+ messagesClient = new MessagesTcpClient(connection);
+ consumerGroupsClient = new
ConsumerGroupsTcpClient(connection);
+ streamsClient = new StreamsTcpClient(connection);
+ topicsClient = new TopicsTcpClient(connection);
+ usersClient = new UsersTcpClient(connection);
+ })
+ .thenCompose(v -> {
+ // Auto-login if credentials are provided
+ if (username.isPresent() && password.isPresent()) {
+ return usersClient.loginAsync(username.get(),
password.get())
+ .thenApply(identity -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ });
}
/**
@@ -171,6 +178,8 @@ public class AsyncIggyTcpClient {
private Duration requestTimeout;
private Integer connectionPoolSize;
private RetryPolicy retryPolicy;
+ private boolean enableTls = false;
+ private File tlsTrustedCertificatePem;
private Builder() {
}
@@ -254,6 +263,49 @@ public class AsyncIggyTcpClient {
return this;
}
+ /**
+ * Enables or disables TLS for the TCP connection.
+ *
+ * @param enableTls whether to enable TLS
+ * @return this builder
+ */
+ public Builder tls(boolean enableTls) {
+ this.enableTls = enableTls;
+ return this;
+ }
+
+ /**
+ * Enables TLS for the TCP connection.
+ *
+ * @return this builder
+ */
+ public Builder enableTls() {
+ this.enableTls = true;
+ return this;
+ }
+
+ /**
+ * Sets a custom trusted certificate (PEM file) to validate the server
certificate.
+ *
+ * @param certificate the PEM file containing the certificate or CA
chain
+ * @return this builder
+ */
+ public Builder tlsTrustedCertificate(File certificate) {
+ this.tlsTrustedCertificatePem = certificate;
+ return this;
+ }
+
+ /**
+ * Sets a custom trusted certificate (PEM file path) to validate the
server certificate.
+ *
+ * @param certificatePath the PEM file path containing the certificate
or CA chain
+ * @return this builder
+ */
+ public Builder tlsTrustedCertificate(String certificatePath) {
+ this.tlsTrustedCertificatePem =
StringUtils.isBlank(certificatePath) ? null : new File(certificatePath);
+ return this;
+ }
+
/**
* Builds and returns a configured AsyncIggyTcpClient instance.
* Note: You still need to call connect() on the returned client.
@@ -268,7 +320,8 @@ public class AsyncIggyTcpClient {
throw new IllegalArgumentException("Port must be a positive
integer");
}
return new AsyncIggyTcpClient(host, port, username, password,
- connectionTimeout, requestTimeout, connectionPoolSize,
retryPolicy);
+ connectionTimeout, requestTimeout, connectionPoolSize,
retryPolicy,
+ enableTls, Optional.ofNullable(tlsTrustedCertificatePem));
}
}
@@ -300,10 +353,10 @@ public class AsyncIggyTcpClient {
/**
* Creates a retry policy with exponential backoff and custom
parameters.
*
- * @param maxRetries the maximum number of retries
+ * @param maxRetries the maximum number of retries
* @param initialDelay the initial delay before the first retry
- * @param maxDelay the maximum delay between retries
- * @param multiplier the multiplier for exponential backoff
+ * @param maxDelay the maximum delay between retries
+ * @param multiplier the multiplier for exponential backoff
* @return a RetryPolicy with custom exponential backoff configuration
*/
public static RetryPolicy exponentialBackoff(int maxRetries, Duration
initialDelay, Duration maxDelay, double multiplier) {
@@ -314,7 +367,7 @@ public class AsyncIggyTcpClient {
* Creates a retry policy with fixed delay.
*
* @param maxRetries the maximum number of retries
- * @param delay the fixed delay between retries
+ * @param delay the fixed delay between retries
* @return a RetryPolicy with fixed delay configuration
*/
public static RetryPolicy fixedDelay(int maxRetries, Duration delay) {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index c1f5d14a3..faa1c8ba2 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -21,13 +21,25 @@ package org.apache.iggy.client.async.tcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLException;
+import java.io.File;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +53,9 @@ public class AsyncTcpConnection {
private final String host;
private final int port;
+ private final boolean enableTls;
+ private final Optional<File> tlsCertificate;
+ private final SslContext sslContext;
private final EventLoopGroup eventLoopGroup;
private final Bootstrap bootstrap;
private Channel channel;
@@ -48,35 +63,56 @@ public class AsyncTcpConnection {
private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>>
pendingRequests = new ConcurrentHashMap<>();
public AsyncTcpConnection(String host, int port) {
+ this(host, port, false, Optional.empty());
+ }
+
+ public AsyncTcpConnection(String host, int port, boolean enableTls,
Optional<File> tlsCertificate) {
this.host = host;
this.port = port;
+ this.enableTls = enableTls;
+ this.tlsCertificate = tlsCertificate;
this.eventLoopGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
+ if (this.enableTls) {
+ try {
+ SslContextBuilder builder = SslContextBuilder.forClient();
+ this.tlsCertificate.ifPresent(builder::trustManager);
+ this.sslContext = builder.build();
+ } catch (SSLException e) {
+ throw new RuntimeException("Failed to build SSL context for
AsyncTcpConnection", e);
+ }
+ } else {
+ this.sslContext = null;
+ }
configureBootstrap();
}
private void configureBootstrap() {
bootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new IggyFrameDecoder());
-
- // No encoder needed - we build complete frames following
Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking client
implementation.
-
- // Response handler
- pipeline.addLast("responseHandler",
- new IggyResponseHandler(pendingRequests));
- }
- });
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+
+ if (enableTls) {
+ pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
+ }
+
+ // Custom frame decoder for Iggy protocol responses
+ pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+
+ // No encoder needed - we build complete frames
following Iggy protocol
+ // The protocol already includes the length field, so
adding an encoder
+ // would duplicate it. This matches the blocking
client implementation.
+
+ // Response handler
+ pipeline.addLast("responseHandler",
+ new IggyResponseHandler(pendingRequests));
+ }
+ });
}
/**
@@ -103,7 +139,7 @@ public class AsyncTcpConnection {
public CompletableFuture<ByteBuf> sendAsync(int commandCode, ByteBuf
payload) {
if (channel == null || !channel.isActive()) {
return CompletableFuture.failedFuture(
- new IllegalStateException("Connection not established or
closed"));
+ new IllegalStateException("Connection not established or
closed"));
}
// Since Iggy doesn't use request IDs, we'll just use a simple queue
@@ -132,7 +168,7 @@ public class AsyncTcpConnection {
hex.append(String.format("%02x ", b));
}
logger.trace("Sending frame with command: {}, payload size: {},
frame payload size (with command): {}, total frame size: {}",
- commandCode, payloadSize, framePayloadSize,
frame.readableBytes());
+ commandCode, payloadSize, framePayloadSize,
frame.readableBytes());
logger.trace("Frame bytes (hex): {}", hex.toString());
}
@@ -195,8 +231,8 @@ public class AsyncTcpConnection {
// Get the oldest pending request
if (!pendingRequests.isEmpty()) {
Long oldestRequestId = pendingRequests.keySet().stream()
- .min(Long::compare)
- .orElse(null);
+ .min(Long::compare)
+ .orElse(null);
if (oldestRequestId != null) {
CompletableFuture<ByteBuf> future =
pendingRequests.remove(oldestRequestId);
@@ -210,10 +246,10 @@ public class AsyncTcpConnection {
byte[] errorBytes = new byte[length];
msg.readBytes(errorBytes);
future.completeExceptionally(
- new RuntimeException("Server error: " + new
String(errorBytes)));
+ new RuntimeException("Server error: " +
new String(errorBytes)));
} else {
future.completeExceptionally(
- new RuntimeException("Server error with
status: " + status));
+ new RuntimeException("Server error with
status: " + status));
}
}
}
@@ -224,7 +260,7 @@ public class AsyncTcpConnection {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
// Fail all pending requests
pendingRequests.values().forEach(future ->
- future.completeExceptionally(cause));
+ future.completeExceptionally(cause));
pendingRequests.clear();
ctx.close();
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
index 3e389057e..ed39ad945 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java
@@ -19,6 +19,7 @@
package org.apache.iggy.client.blocking.tcp;
+import org.apache.commons.lang3.StringUtils;
import org.apache.iggy.client.blocking.ConsumerGroupsClient;
import org.apache.iggy.client.blocking.ConsumerOffsetsClient;
import org.apache.iggy.client.blocking.IggyBaseClient;
@@ -29,6 +30,8 @@ import org.apache.iggy.client.blocking.StreamsClient;
import org.apache.iggy.client.blocking.SystemClient;
import org.apache.iggy.client.blocking.TopicsClient;
import org.apache.iggy.client.blocking.UsersClient;
+
+import java.io.File;
import java.time.Duration;
import java.util.Optional;
@@ -51,14 +54,17 @@ public class IggyTcpClient implements IggyBaseClient {
private final Optional<Duration> requestTimeout;
private final Optional<Integer> connectionPoolSize;
private final Optional<RetryPolicy> retryPolicy;
+ private final boolean enableTls;
+ private final Optional<File> tlsCertificate;
public IggyTcpClient(String host, Integer port) {
- this(host, port, null, null, null, null, null, null);
+ this(host, port, null, null, null, null, null, null, false,
Optional.empty());
}
private IggyTcpClient(String host, Integer port, String username, String
password,
Duration connectionTimeout, Duration requestTimeout,
- Integer connectionPoolSize, RetryPolicy retryPolicy)
{
+ Integer connectionPoolSize, RetryPolicy retryPolicy,
+ boolean enableTls, Optional<File> tlsCertificate) {
this.host = host;
this.port = port;
this.username = Optional.ofNullable(username);
@@ -67,8 +73,10 @@ public class IggyTcpClient implements IggyBaseClient {
this.requestTimeout = Optional.ofNullable(requestTimeout);
this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
this.retryPolicy = Optional.ofNullable(retryPolicy);
+ this.enableTls = enableTls;
+ this.tlsCertificate = tlsCertificate;
- InternalTcpClient tcpClient = new InternalTcpClient(host, port);
+ InternalTcpClient tcpClient = new InternalTcpClient(host, port,
enableTls, tlsCertificate);
tcpClient.connect();
usersClient = new UsersTcpClient(tcpClient);
streamsClient = new StreamsTcpClient(tcpClient);
@@ -152,6 +160,8 @@ public class IggyTcpClient implements IggyBaseClient {
private Duration requestTimeout;
private Integer connectionPoolSize;
private RetryPolicy retryPolicy;
+ private boolean enableTls = false;
+ private File tlsCertificate;
private Builder() {
}
@@ -235,6 +245,49 @@ public class IggyTcpClient implements IggyBaseClient {
return this;
}
+ /**
+ * Enables or disables TLS for the TCP connection.
+ *
+ * @param enableTls whether to enable TLS
+ * @return this builder
+ */
+ public Builder tls(boolean enableTls) {
+ this.enableTls = enableTls;
+ return this;
+ }
+
+ /**
+ * Enables TLS for the TCP connection.
+ *
+ * @return this builder
+ */
+ public Builder enableTls() {
+ this.enableTls = true;
+ return this;
+ }
+
+ /**
+ * Sets a custom trusted certificate (PEM file) to validate the server
certificate.
+ *
+ * @param tlsCertificate the PEM file containing the certificate or CA
chain
+ * @return this builder
+ */
+ public Builder tlsCertificate(File tlsCertificate) {
+ this.tlsCertificate = tlsCertificate;
+ return this;
+ }
+
+ /**
+ * Sets a custom trusted certificate (PEM file path) to validate the
server certificate.
+ *
+ * @param tlsCertificatePath the PEM file path containing the
certificate or CA chain
+ * @return this builder
+ */
+ public Builder tlsCertificate(String tlsCertificatePath) {
+ this.tlsCertificate = StringUtils.isBlank(tlsCertificatePath) ?
null : new File(tlsCertificatePath);
+ return this;
+ }
+
/**
* Builds and returns a configured IggyTcpClient instance.
*
@@ -247,8 +300,10 @@ public class IggyTcpClient implements IggyBaseClient {
if (port == null || port <= 0) {
throw new IllegalArgumentException("Port must be a positive
integer");
}
+ boolean enableTls = this.enableTls;
return new IggyTcpClient(host, port, username, password,
- connectionTimeout, requestTimeout, connectionPoolSize,
retryPolicy);
+ connectionTimeout, requestTimeout, connectionPoolSize,
retryPolicy,
+ enableTls, Optional.ofNullable(tlsCertificate));
}
}
@@ -280,10 +335,10 @@ public class IggyTcpClient implements IggyBaseClient {
/**
* Creates a retry policy with exponential backoff and custom
parameters.
*
- * @param maxRetries the maximum number of retries
+ * @param maxRetries the maximum number of retries
* @param initialDelay the initial delay before the first retry
- * @param maxDelay the maximum delay between retries
- * @param multiplier the multiplier for exponential backoff
+ * @param maxDelay the maximum delay between retries
+ * @param multiplier the multiplier for exponential backoff
* @return a RetryPolicy with custom exponential backoff configuration
*/
public static RetryPolicy exponentialBackoff(int maxRetries, Duration
initialDelay, Duration maxDelay, double multiplier) {
@@ -294,7 +349,7 @@ public class IggyTcpClient implements IggyBaseClient {
* Creates a retry policy with fixed delay.
*
* @param maxRetries the maximum number of retries
- * @param delay the fixed delay between retries
+ * @param delay the fixed delay between retries
* @return a RetryPolicy with fixed delay configuration
*/
public static RetryPolicy fixedDelay(int maxRetries, Duration delay) {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
index 59b89c3b9..d01190047 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
@@ -23,10 +23,16 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
+
+import javax.net.ssl.SSLException;
+import java.io.File;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,10 +47,27 @@ final class InternalTcpClient {
private Connection connection;
InternalTcpClient(String host, Integer port) {
- client = TcpClient.create()
+ this(host, port, false, Optional.empty());
+ }
+
+ InternalTcpClient(String host, Integer port, boolean enableTls,
Optional<File> tlsCertificate) {
+ TcpClient tcpClient = TcpClient.create()
.host(host)
.port(port)
.doOnConnected(conn -> conn.addHandlerLast(new
IggyResponseDecoder()));
+
+ if (enableTls) {
+ try {
+ SslContextBuilder builder = SslContextBuilder.forClient();
+ tlsCertificate.ifPresent(builder::trustManager);
+ SslContext sslContext = builder.build();
+ tcpClient = tcpClient.secure(sslSpec ->
sslSpec.sslContext(sslContext));
+ } catch (SSLException e) {
+ throw new RuntimeException("Failed to configure TLS for
TcpClient", e);
+ }
+ }
+
+ client = tcpClient;
}
void connect() {
@@ -56,7 +79,9 @@ final class InternalTcpClient {
return send(code.getValue());
}
- /** Use {@link #send(CommandCode)} instead. */
+ /**
+ * Use {@link #send(CommandCode)} instead.
+ */
@Deprecated
ByteBuf send(int command) {
return send(command, Unpooled.EMPTY_BUFFER);
@@ -66,7 +91,9 @@ final class InternalTcpClient {
return send(code.getValue(), payload);
}
- /** Use {@link #send(CommandCode, ByteBuf)} instead. */
+ /**
+ * Use {@link #send(CommandCode, ByteBuf)} instead.
+ */
@Deprecated
ByteBuf send(int command, ByteBuf payload) {
var payloadSize = payload.readableBytes() + COMMAND_LENGTH;