This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f8130f  feat: support sasl/plain auth (#375)
5f8130f is described below

commit 5f8130f26957a667eb6b9b33cf3b83841e87e5f2
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 28 11:40:02 2026 +0000

    feat: support sasl/plain auth (#375)
---
 .github/workflows/build_and_test_rust.yml          |   2 +-
 bindings/cpp/include/fluss.hpp                     |  10 +
 bindings/cpp/src/ffi_converter.hpp                 |   5 +
 bindings/cpp/src/lib.rs                            |  13 +
 bindings/cpp/test/test_sasl_auth.cpp               | 125 +++++++++
 bindings/cpp/test/test_utils.h                     | 151 +++++++----
 bindings/python/src/config.rs                      |  77 ++++++
 bindings/python/test/conftest.py                   |  57 +++-
 bindings/python/test/test_sasl_auth.py             | 108 ++++++++
 crates/fluss/src/client/connection.rs              |  18 +-
 crates/fluss/src/config.rs                         | 166 +++++++++++-
 crates/fluss/src/proto/fluss_api.proto             |  11 +-
 crates/fluss/src/rpc/api_key.rs                    |   4 +
 crates/fluss/src/rpc/message/authenticate.rs       |  86 ++++++
 crates/fluss/src/rpc/message/mod.rs                |   2 +
 crates/fluss/src/rpc/server_connection.rs          | 107 +++++++-
 crates/fluss/tests/integration/admin.rs            |  52 +---
 crates/fluss/tests/integration/fluss_cluster.rs    | 290 +++++++++++++++++----
 crates/fluss/tests/integration/kv_table.rs         |  43 +--
 crates/fluss/tests/integration/log_table.rs        | 107 ++++----
 crates/fluss/tests/integration/sasl_auth.rs        | 149 +++++++++++
 .../fluss/tests/integration/table_remote_scan.rs   |  95 +------
 crates/fluss/tests/integration/utils.rs            | 141 ++++++----
 crates/fluss/tests/test_fluss.rs                   |   1 +
 website/docs/user-guide/cpp/error-handling.md      |  21 ++
 .../docs/user-guide/cpp/example/configuration.md   |  39 ++-
 website/docs/user-guide/python/error-handling.md   |  19 ++
 .../user-guide/python/example/configuration.md     |  26 +-
 website/docs/user-guide/rust/error-handling.md     |  19 ++
 .../docs/user-guide/rust/example/configuration.md  |  46 +++-
 30 files changed, 1560 insertions(+), 430 deletions(-)

diff --git a/.github/workflows/build_and_test_rust.yml 
b/.github/workflows/build_and_test_rust.yml
index c9e05b7..1bf7bc5 100644
--- a/.github/workflows/build_and_test_rust.yml
+++ b/.github/workflows/build_and_test_rust.yml
@@ -79,7 +79,7 @@ jobs:
       - name: Integration Test (Linux only)
         if: runner.os == 'Linux'
         run: |
-          RUST_TEST_THREADS=1 cargo test --features integration_tests 
--all-targets --workspace --exclude fluss_python --exclude fluss-cpp -- 
--nocapture
+          cargo test --features integration_tests --all-targets --workspace 
--exclude fluss_python --exclude fluss-cpp
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index cb06028..b507da7 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -998,6 +998,16 @@ struct Configuration {
     // Maximum number of records returned in a single call to Poll() for 
LogScanner
     size_t scanner_log_max_poll_records{500};
     int64_t writer_batch_timeout_ms{100};
+    // Connect timeout in milliseconds for TCP transport connect
+    uint64_t connect_timeout_ms{120000};
+    // Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
+    std::string security_protocol{"PLAINTEXT"};
+    // SASL mechanism (only "PLAIN" is supported)
+    std::string security_sasl_mechanism{"PLAIN"};
+    // SASL username (required when security_protocol is "sasl")
+    std::string security_sasl_username;
+    // SASL password (required when security_protocol is "sasl")
+    std::string security_sasl_password;
 };
 
 class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 9020027..3375761 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -57,6 +57,11 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.scanner_remote_log_read_concurrency = 
config.scanner_remote_log_read_concurrency;
     ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
     ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
+    ffi_config.connect_timeout_ms = config.connect_timeout_ms;
+    ffi_config.security_protocol = rust::String(config.security_protocol);
+    ffi_config.security_sasl_mechanism = 
rust::String(config.security_sasl_mechanism);
+    ffi_config.security_sasl_username = 
rust::String(config.security_sasl_username);
+    ffi_config.security_sasl_password = 
rust::String(config.security_sasl_password);
     return ffi_config;
 }
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index d26af6a..c310fc8 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -49,6 +49,11 @@ mod ffi {
         scanner_remote_log_read_concurrency: usize,
         scanner_log_max_poll_records: usize,
         writer_batch_timeout_ms: i64,
+        connect_timeout_ms: u64,
+        security_protocol: String,
+        security_sasl_mechanism: String,
+        security_sasl_username: String,
+        security_sasl_password: String,
     }
 
     struct FfiResult {
@@ -258,6 +263,9 @@ mod ffi {
         type LookupResultInner;
 
         // Connection
+        // TODO: all Result<*mut T> methods lose server error codes (mapped to 
CLIENT_ERROR).
+        // Fix by introducing  some struct like { result: FfiResult, ptr: i64 
} to preserve error
+        // codes from the server, matching how Rust and Python bindings handle 
errors.
         fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
         unsafe fn delete_connection(conn: *mut Connection);
         fn get_admin(self: &Connection) -> Result<*mut Admin>;
@@ -645,6 +653,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
         scanner_remote_log_read_concurrency: 
config.scanner_remote_log_read_concurrency,
         scanner_log_max_poll_records: config.scanner_log_max_poll_records,
+        connect_timeout_ms: config.connect_timeout_ms,
+        security_protocol: config.security_protocol.to_string(),
+        security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
+        security_sasl_username: config.security_sasl_username.to_string(),
+        security_sasl_password: config.security_sasl_password.to_string(),
     };
 
     let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config_core).await });
diff --git a/bindings/cpp/test/test_sasl_auth.cpp 
b/bindings/cpp/test/test_sasl_auth.cpp
new file mode 100644
index 0000000..2208db3
--- /dev/null
+++ b/bindings/cpp/test/test_sasl_auth.cpp
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class SaslAuthTest : public ::testing::Test {
+   protected:
+    const std::string& sasl_servers() {
+        return 
fluss_test::FlussTestEnvironment::Instance()->GetSaslBootstrapServers();
+    }
+    const std::string& plaintext_servers() {
+        return 
fluss_test::FlussTestEnvironment::Instance()->GetBootstrapServers();
+    }
+};
+
+TEST_F(SaslAuthTest, SaslConnectWithValidCredentials) {
+    fluss::Configuration config;
+    config.bootstrap_servers = sasl_servers();
+    config.security_protocol = "sasl";
+    config.security_sasl_mechanism = "PLAIN";
+    config.security_sasl_username = "admin";
+    config.security_sasl_password = "admin-secret";
+
+    fluss::Connection conn;
+    ASSERT_OK(fluss::Connection::Create(config, conn));
+
+    fluss::Admin admin;
+    ASSERT_OK(conn.GetAdmin(admin));
+
+    // Perform a basic operation to confirm the connection is fully functional
+    std::string db_name = "cpp_sasl_test_valid_db";
+    fluss::DatabaseDescriptor descriptor;
+    descriptor.comment = "created via SASL auth";
+    ASSERT_OK(admin.CreateDatabase(db_name, descriptor, true));
+
+    bool exists = false;
+    ASSERT_OK(admin.DatabaseExists(db_name, exists));
+    ASSERT_TRUE(exists);
+
+    ASSERT_OK(admin.DropDatabase(db_name, true, true));
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithSecondUser) {
+    fluss::Configuration config;
+    config.bootstrap_servers = sasl_servers();
+    config.security_protocol = "sasl";
+    config.security_sasl_mechanism = "PLAIN";
+    config.security_sasl_username = "alice";
+    config.security_sasl_password = "alice-secret";
+
+    fluss::Connection conn;
+    ASSERT_OK(fluss::Connection::Create(config, conn));
+
+    fluss::Admin admin;
+    ASSERT_OK(conn.GetAdmin(admin));
+
+    // Basic operation to confirm functional connection
+    bool exists = false;
+    ASSERT_OK(admin.DatabaseExists("some_nonexistent_db_alice", exists));
+    ASSERT_FALSE(exists);
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithWrongPassword) {
+    fluss::Configuration config;
+    config.bootstrap_servers = sasl_servers();
+    config.security_protocol = "sasl";
+    config.security_sasl_mechanism = "PLAIN";
+    config.security_sasl_username = "admin";
+    config.security_sasl_password = "wrong-password";
+
+    fluss::Connection conn;
+    auto result = fluss::Connection::Create(config, conn);
+    ASSERT_FALSE(result.Ok());
+    // TODO: error_code is CLIENT_ERROR (-2) because CXX Result<*mut T> loses 
the server
+    // error code. Should be AUTHENTICATE_EXCEPTION (46) once fixed
+    EXPECT_NE(result.error_message.find("Authentication failed"), 
std::string::npos)
+        << "Expected 'Authentication failed' in: " << result.error_message;
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithUnknownUser) {
+    fluss::Configuration config;
+    config.bootstrap_servers = sasl_servers();
+    config.security_protocol = "sasl";
+    config.security_sasl_mechanism = "PLAIN";
+    config.security_sasl_username = "nonexistent_user";
+    config.security_sasl_password = "some-password";
+
+    fluss::Connection conn;
+    auto result = fluss::Connection::Create(config, conn);
+    ASSERT_FALSE(result.Ok());
+    // TODO: same as above — should check error_code == AUTHENTICATE_EXCEPTION 
once fixed.
+    EXPECT_NE(result.error_message.find("Authentication failed"), 
std::string::npos)
+        << "Expected 'Authentication failed' in: " << result.error_message;
+}
+
+TEST_F(SaslAuthTest, SaslClientToPlaintextServer) {
+    fluss::Configuration config;
+    config.bootstrap_servers = plaintext_servers();
+    config.security_protocol = "sasl";
+    config.security_sasl_mechanism = "PLAIN";
+    config.security_sasl_username = "admin";
+    config.security_sasl_password = "admin-secret";
+
+    fluss::Connection conn;
+    auto result = fluss::Connection::Create(config, conn);
+    ASSERT_FALSE(result.Ok()) << "SASL client connecting to plaintext server 
should fail";
+}
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index bae5237..98d119a 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -49,23 +49,46 @@
 
 namespace fluss_test {
 
-static constexpr const char* kFlussVersion = "0.7.0";
+static constexpr const char* kFlussImage = "apache/fluss";
+static constexpr const char* kFlussVersion = "0.8.0-incubating";
 static constexpr const char* kNetworkName = "fluss-cpp-test-network";
 static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
 static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
 static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
 static constexpr int kCoordinatorPort = 9123;
 static constexpr int kTabletServerPort = 9124;
+static constexpr int kPlainClientPort = 9223;
+static constexpr int kPlainClientTabletPort = 9224;
 
 /// Execute a shell command and return its exit code.
-inline int RunCommand(const std::string& cmd) {
-    return system(cmd.c_str());
+inline int RunCommand(const std::string& cmd) { return system(cmd.c_str()); }
+
+/// Join property lines with the escaped newline separator used by `printf` in 
docker commands.
+inline std::string JoinProps(const std::vector<std::string>& lines) {
+    std::string result;
+    for (size_t i = 0; i < lines.size(); ++i) {
+        if (i > 0) result += "\\n";
+        result += lines[i];
+    }
+    return result;
+}
+
+/// Build a `docker run` command with FLUSS_PROPERTIES.
+inline std::string DockerRunCmd(const std::string& name, const std::string& 
props,
+                                const std::vector<std::string>& port_mappings,
+                                const std::string& server_type) {
+    std::string cmd = "docker run -d --rm --name " + name + " --network " + 
kNetworkName;
+    for (const auto& pm : port_mappings) {
+        cmd += " -p " + pm;
+    }
+    cmd += " -e FLUSS_PROPERTIES=\"$(printf '" + props + "')\"";
+    cmd += " " + std::string(kFlussImage) + ":" + kFlussVersion + " " + 
server_type;
+    return cmd;
 }
 
 /// Wait until a TCP port is accepting connections, or timeout.
 inline bool WaitForPort(const std::string& host, int port, int timeout_seconds 
= 60) {
-    auto deadline =
-        std::chrono::steady_clock::now() + 
std::chrono::seconds(timeout_seconds);
+    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(timeout_seconds);
 
     while (std::chrono::steady_clock::now() < deadline) {
         int sock = socket(AF_INET, SOCK_STREAM, 0);
@@ -114,10 +137,8 @@ class FlussTestCluster {
         RunCommand(std::string("docker network create ") + kNetworkName + " 
2>/dev/null || true");
 
         // Start ZooKeeper
-        std::string zk_cmd = std::string("docker run -d --rm") +
-                              " --name " + kZookeeperName +
-                              " --network " + kNetworkName +
-                              " zookeeper:3.9.2";
+        std::string zk_cmd = std::string("docker run -d --rm") + " --name " + 
kZookeeperName +
+                             " --network " + kNetworkName + " zookeeper:3.9.2";
         if (RunCommand(zk_cmd) != 0) {
             std::cerr << "Failed to start ZooKeeper" << std::endl;
             return false;
@@ -126,23 +147,29 @@ class FlussTestCluster {
         // Wait for ZooKeeper to be ready before starting Fluss servers
         std::this_thread::sleep_for(std::chrono::seconds(5));
 
-        // Start Coordinator Server
-        std::string coord_props =
-            "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
-            "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) + 
":0, CLIENT://" +
-            std::string(kCoordinatorName) + ":9123\\n"
-            "advertised.listeners: CLIENT://localhost:9123\\n"
-            "internal.listener.name: INTERNAL\\n"
-            "netty.server.num-network-threads: 1\\n"
-            "netty.server.num-worker-threads: 3";
-
-        std::string coord_cmd = std::string("docker run -d --rm") +
-                                " --name " + kCoordinatorName +
-                                " --network " + kNetworkName +
-                                " -p 9123:9123" +
-                                " -e FLUSS_PROPERTIES=\"$(printf '" + 
coord_props + "')\"" +
-                                " fluss/fluss:" + kFlussVersion +
-                                " coordinatorServer";
+        // Start Coordinator Server (dual listeners: CLIENT=SASL on 9123, 
PLAIN_CLIENT=plaintext on
+        // 9223)
+        std::string sasl_jaas =
+            "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule 
required"
+            " user_admin=\"admin-secret\" user_alice=\"alice-secret\";";
+
+        std::string coord = std::string(kCoordinatorName);
+        std::string zk = std::string(kZookeeperName);
+        std::string coord_props = JoinProps({
+            "zookeeper.address: " + zk + ":2181",
+            "bind.listeners: INTERNAL://" + coord + ":0, CLIENT://" + coord +
+                ":9123, PLAIN_CLIENT://" + coord + ":9223",
+            "advertised.listeners: CLIENT://localhost:9123, 
PLAIN_CLIENT://localhost:9223",
+            "internal.listener.name: INTERNAL",
+            "security.protocol.map: CLIENT:sasl",
+            "security.sasl.enabled.mechanisms: plain",
+            "security.sasl.plain.jaas.config: " + sasl_jaas,
+            "netty.server.num-network-threads: 1",
+            "netty.server.num-worker-threads: 3",
+        });
+
+        std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props,
+                                             {"9123:9123", "9223:9223"}, 
"coordinatorServer");
         if (RunCommand(coord_cmd) != 0) {
             std::cerr << "Failed to start Coordinator Server" << std::endl;
             Stop();
@@ -156,24 +183,27 @@ class FlussTestCluster {
             return false;
         }
 
-        // Start Tablet Server
-        std::string ts_props =
-            "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
-            "bind.listeners: INTERNAL://" + std::string(kTabletServerName) + 
":0, CLIENT://" +
-            std::string(kTabletServerName) + ":9123\\n"
-            "advertised.listeners: CLIENT://localhost:" + 
std::to_string(kTabletServerPort) + "\\n"
-            "internal.listener.name: INTERNAL\\n"
-            "tablet-server.id: 0\\n"
-            "netty.server.num-network-threads: 1\\n"
-            "netty.server.num-worker-threads: 3";
-
-        std::string ts_cmd = std::string("docker run -d --rm") +
-                             " --name " + kTabletServerName +
-                             " --network " + kNetworkName +
-                             " -p " + std::to_string(kTabletServerPort) + 
":9123" +
-                             " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props + 
"')\"" +
-                             " fluss/fluss:" + kFlussVersion +
-                             " tabletServer";
+        // Start Tablet Server (dual listeners: CLIENT=SASL on 9123, 
PLAIN_CLIENT=plaintext on 9223)
+        std::string ts = std::string(kTabletServerName);
+        std::string ts_props = JoinProps({
+            "zookeeper.address: " + zk + ":2181",
+            "bind.listeners: INTERNAL://" + ts + ":0, CLIENT://" + ts + 
":9123, PLAIN_CLIENT://" +
+                ts + ":9223",
+            "advertised.listeners: CLIENT://localhost:" + 
std::to_string(kTabletServerPort) +
+                ", PLAIN_CLIENT://localhost:" + 
std::to_string(kPlainClientTabletPort),
+            "internal.listener.name: INTERNAL",
+            "security.protocol.map: CLIENT:sasl",
+            "security.sasl.enabled.mechanisms: plain",
+            "security.sasl.plain.jaas.config: " + sasl_jaas,
+            "tablet-server.id: 0",
+            "netty.server.num-network-threads: 1",
+            "netty.server.num-worker-threads: 3",
+        });
+
+        std::string ts_cmd = DockerRunCmd(kTabletServerName, ts_props,
+                                          {std::to_string(kTabletServerPort) + 
":9123",
+                                           
std::to_string(kPlainClientTabletPort) + ":9223"},
+                                          "tabletServer");
         if (RunCommand(ts_cmd) != 0) {
             std::cerr << "Failed to start Tablet Server" << std::endl;
             Stop();
@@ -187,7 +217,20 @@ class FlussTestCluster {
             return false;
         }
 
-        bootstrap_servers_ = "127.0.0.1:9123";
+        // Wait for plaintext listeners
+        if (!WaitForPort("127.0.0.1", kPlainClientPort)) {
+            std::cerr << "Coordinator plaintext listener did not become ready" 
<< std::endl;
+            Stop();
+            return false;
+        }
+        if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
+            std::cerr << "Tablet Server plaintext listener did not become 
ready" << std::endl;
+            Stop();
+            return false;
+        }
+
+        bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
+        sasl_bootstrap_servers_ = "127.0.0.1:" + 
std::to_string(kCoordinatorPort);
         std::cout << "Fluss cluster started successfully." << std::endl;
         return true;
     }
@@ -204,9 +247,11 @@ class FlussTestCluster {
     }
 
     const std::string& GetBootstrapServers() const { return 
bootstrap_servers_; }
+    const std::string& GetSaslBootstrapServers() const { return 
sasl_bootstrap_servers_; }
 
    private:
     std::string bootstrap_servers_;
+    std::string sasl_bootstrap_servers_;
     bool external_cluster_{false};
 };
 
@@ -230,8 +275,7 @@ class FlussTestEnvironment : public ::testing::Environment {
         fluss::Configuration config;
         config.bootstrap_servers = cluster_.GetBootstrapServers();
 
-        auto deadline =
-            std::chrono::steady_clock::now() + std::chrono::seconds(60);
+        auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(60);
         while (std::chrono::steady_clock::now() < deadline) {
             auto result = fluss::Connection::Create(config, connection_);
             if (result.Ok()) {
@@ -247,13 +291,12 @@ class FlussTestEnvironment : public 
::testing::Environment {
         GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
     }
 
-    void TearDown() override {
-        cluster_.Stop();
-    }
+    void TearDown() override { cluster_.Stop(); }
 
     fluss::Connection& GetConnection() { return connection_; }
     fluss::Admin& GetAdmin() { return admin_; }
     const std::string& GetBootstrapServers() { return 
cluster_.GetBootstrapServers(); }
+    const std::string& GetSaslBootstrapServers() { return 
cluster_.GetSaslBootstrapServers(); }
 
    private:
     FlussTestEnvironment() = default;
@@ -286,8 +329,8 @@ inline void CreatePartitions(fluss::Admin& admin, const 
fluss::TablePath& path,
 /// Poll a LogScanner for ScanRecords until `expected_count` items are 
collected or timeout.
 /// `extract_fn` is called for each ScanRecord and should return a value of 
type T.
 template <typename T, typename ExtractFn>
-void PollRecords(fluss::LogScanner& scanner, size_t expected_count,
-                 ExtractFn extract_fn, std::vector<T>& out) {
+void PollRecords(fluss::LogScanner& scanner, size_t expected_count, ExtractFn 
extract_fn,
+                 std::vector<T>& out) {
     auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(10);
     while (out.size() < expected_count && std::chrono::steady_clock::now() < 
deadline) {
         fluss::ScanRecords records;
@@ -301,8 +344,8 @@ void PollRecords(fluss::LogScanner& scanner, size_t 
expected_count,
 /// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are 
collected or timeout.
 /// `extract_fn` is called with the full ArrowRecordBatches and should return 
a std::vector<T>.
 template <typename T, typename ExtractFn>
-void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count,
-                       ExtractFn extract_fn, std::vector<T>& out) {
+void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count, 
ExtractFn extract_fn,
+                       std::vector<T>& out) {
     auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(10);
     while (out.size() < expected_count && std::chrono::steady_clock::now() < 
deadline) {
         fluss::ArrowRecordBatches batches;
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 9c0059e..4582d43 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -108,6 +108,23 @@ impl Config {
                             }
                         };
                     }
+                    "connect-timeout" => {
+                        config.connect_timeout_ms = 
value.parse::<u64>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
+                    }
+                    "security.protocol" => {
+                        config.security_protocol = value;
+                    }
+                    "security.sasl.mechanism" => {
+                        config.security_sasl_mechanism = value;
+                    }
+                    "security.sasl.username" => {
+                        config.security_sasl_username = value;
+                    }
+                    "security.sasl.password" => {
+                        config.security_sasl_password = value;
+                    }
                     _ => {
                         return Err(FlussError::new_err(format!("Unknown 
property: {key}")));
                     }
@@ -237,6 +254,66 @@ impl Config {
     fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
         self.inner.writer_batch_timeout_ms = timeout;
     }
+
+    /// Get the connect timeout in milliseconds
+    #[getter]
+    fn connect_timeout_ms(&self) -> u64 {
+        self.inner.connect_timeout_ms
+    }
+
+    /// Set the connect timeout in milliseconds
+    #[setter]
+    fn set_connect_timeout_ms(&mut self, timeout: u64) {
+        self.inner.connect_timeout_ms = timeout;
+    }
+
+    /// Get the security protocol
+    #[getter]
+    fn security_protocol(&self) -> String {
+        self.inner.security_protocol.clone()
+    }
+
+    /// Set the security protocol
+    #[setter]
+    fn set_security_protocol(&mut self, protocol: String) {
+        self.inner.security_protocol = protocol;
+    }
+
+    /// Get the SASL mechanism
+    #[getter]
+    fn security_sasl_mechanism(&self) -> String {
+        self.inner.security_sasl_mechanism.clone()
+    }
+
+    /// Set the SASL mechanism
+    #[setter]
+    fn set_security_sasl_mechanism(&mut self, mechanism: String) {
+        self.inner.security_sasl_mechanism = mechanism;
+    }
+
+    /// Get the SASL username
+    #[getter]
+    fn security_sasl_username(&self) -> String {
+        self.inner.security_sasl_username.clone()
+    }
+
+    /// Set the SASL username
+    #[setter]
+    fn set_security_sasl_username(&mut self, username: String) {
+        self.inner.security_sasl_username = username;
+    }
+
+    /// Get the SASL password
+    #[getter]
+    fn security_sasl_password(&self) -> String {
+        self.inner.security_sasl_password.clone()
+    }
+
+    /// Set the SASL password
+    #[setter]
+    fn set_security_sasl_password(&mut self, password: String) {
+        self.inner.security_sasl_password = password;
+    }
 }
 
 impl Config {
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
index fbd7396..0a969e8 100644
--- a/bindings/python/test/conftest.py
+++ b/bindings/python/test/conftest.py
@@ -33,7 +33,8 @@ import pytest_asyncio
 
 import fluss
 
-FLUSS_VERSION = "0.7.0"
+FLUSS_IMAGE = "apache/fluss"
+FLUSS_VERSION = "0.8.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
 
@@ -53,7 +54,7 @@ def _wait_for_port(host, port, timeout=60):
 def fluss_cluster():
     """Start a Fluss cluster using testcontainers, or use an existing one."""
     if BOOTSTRAP_SERVERS_ENV:
-        yield BOOTSTRAP_SERVERS_ENV
+        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
         return
 
     from testcontainers.core.container import DockerContainer
@@ -68,20 +69,30 @@ def fluss_cluster():
         .with_name("zookeeper-python-test")
     )
 
+    sasl_jaas = (
+        "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
+        ' user_admin="admin-secret" user_alice="alice-secret";'
+    )
     coordinator_props = "\n".join([
         "zookeeper.address: zookeeper-python-test:2181",
         "bind.listeners: INTERNAL://coordinator-server-python-test:0,"
-        " CLIENT://coordinator-server-python-test:9123",
-        "advertised.listeners: CLIENT://localhost:9123",
+        " CLIENT://coordinator-server-python-test:9123,"
+        " PLAIN_CLIENT://coordinator-server-python-test:9223",
+        "advertised.listeners: CLIENT://localhost:9123,"
+        " PLAIN_CLIENT://localhost:9223",
         "internal.listener.name: INTERNAL",
+        "security.protocol.map: CLIENT:sasl",
+        "security.sasl.enabled.mechanisms: plain",
+        f"security.sasl.plain.jaas.config: {sasl_jaas}",
         "netty.server.num-network-threads: 1",
         "netty.server.num-worker-threads: 3",
     ])
     coordinator = (
-        DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+        DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
         .with_network(network)
         .with_name("coordinator-server-python-test")
         .with_bind_ports(9123, 9123)
+        .with_bind_ports(9223, 9223)
         .with_command("coordinatorServer")
         .with_env("FLUSS_PROPERTIES", coordinator_props)
     )
@@ -89,18 +100,24 @@ def fluss_cluster():
     tablet_props = "\n".join([
         "zookeeper.address: zookeeper-python-test:2181",
         "bind.listeners: INTERNAL://tablet-server-python-test:0,"
-        " CLIENT://tablet-server-python-test:9123",
-        "advertised.listeners: CLIENT://localhost:9124",
+        " CLIENT://tablet-server-python-test:9123,"
+        " PLAIN_CLIENT://tablet-server-python-test:9223",
+        "advertised.listeners: CLIENT://localhost:9124,"
+        " PLAIN_CLIENT://localhost:9224",
         "internal.listener.name: INTERNAL",
+        "security.protocol.map: CLIENT:sasl",
+        "security.sasl.enabled.mechanisms: plain",
+        f"security.sasl.plain.jaas.config: {sasl_jaas}",
         "tablet-server.id: 0",
         "netty.server.num-network-threads: 1",
         "netty.server.num-worker-threads: 3",
     ])
     tablet_server = (
-        DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+        DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
         .with_network(network)
         .with_name("tablet-server-python-test")
         .with_bind_ports(9123, 9124)
+        .with_bind_ports(9223, 9224)
         .with_command("tabletServer")
         .with_env("FLUSS_PROPERTIES", tablet_props)
     )
@@ -111,10 +128,13 @@ def fluss_cluster():
 
     _wait_for_port("localhost", 9123)
     _wait_for_port("localhost", 9124)
+    _wait_for_port("localhost", 9223)
+    _wait_for_port("localhost", 9224)
     # Extra wait for cluster to fully initialize
     time.sleep(10)
 
-    yield "127.0.0.1:9123"
+    # (plaintext_bootstrap, sasl_bootstrap)
+    yield ("127.0.0.1:9223", "127.0.0.1:9123")
 
     tablet_server.stop()
     coordinator.stop()
@@ -124,13 +144,28 @@ def fluss_cluster():
 
 @pytest_asyncio.fixture(scope="session")
 async def connection(fluss_cluster):
-    """Session-scoped connection to the Fluss cluster."""
-    config = fluss.Config({"bootstrap.servers": fluss_cluster})
+    """Session-scoped connection to the Fluss cluster (plaintext)."""
+    plaintext_addr, _sasl_addr = fluss_cluster
+    config = fluss.Config({"bootstrap.servers": plaintext_addr})
     conn = await fluss.FlussConnection.create(config)
     yield conn
     conn.close()
 
 
[email protected](scope="session")
+def sasl_bootstrap_servers(fluss_cluster):
+    """Bootstrap servers for the SASL listener."""
+    _plaintext_addr, sasl_addr = fluss_cluster
+    return sasl_addr
+
+
[email protected](scope="session")
+def plaintext_bootstrap_servers(fluss_cluster):
+    """Bootstrap servers for the plaintext (non-SASL) listener."""
+    plaintext_addr, _sasl_addr = fluss_cluster
+    return plaintext_addr
+
+
 @pytest_asyncio.fixture(scope="session")
 async def admin(connection):
     """Session-scoped admin client."""
diff --git a/bindings/python/test/test_sasl_auth.py 
b/bindings/python/test/test_sasl_auth.py
new file mode 100644
index 0000000..30fce4c
--- /dev/null
+++ b/bindings/python/test/test_sasl_auth.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for SASL/PLAIN authentication.
+
+Mirrors the Rust integration tests in 
crates/fluss/tests/integration/sasl_auth.rs.
+"""
+
+import pytest
+
+import fluss
+
+
+async def test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers):
+    """Verify that a client with correct SASL credentials can connect and 
perform operations."""
+    config = fluss.Config({
+        "bootstrap.servers": sasl_bootstrap_servers,
+        "security.protocol": "sasl",
+        "security.sasl.mechanism": "PLAIN",
+        "security.sasl.username": "admin",
+        "security.sasl.password": "admin-secret",
+    })
+    conn = await fluss.FlussConnection.create(config)
+    admin = await conn.get_admin()
+
+    db_name = "py_sasl_test_valid_db"
+    db_descriptor = fluss.DatabaseDescriptor(comment="created via SASL auth")
+    await admin.create_database(db_name, db_descriptor, ignore_if_exists=True)
+
+    assert await admin.database_exists(db_name)
+
+    # Cleanup
+    await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+    conn.close()
+
+
+async def test_sasl_connect_with_second_user(sasl_bootstrap_servers):
+    """Verify that a second user can also authenticate successfully."""
+    config = fluss.Config({
+        "bootstrap.servers": sasl_bootstrap_servers,
+        "security.protocol": "sasl",
+        "security.sasl.mechanism": "PLAIN",
+        "security.sasl.username": "alice",
+        "security.sasl.password": "alice-secret",
+    })
+    conn = await fluss.FlussConnection.create(config)
+    admin = await conn.get_admin()
+
+    # Basic operation to confirm functional connection
+    assert not await admin.database_exists("some_nonexistent_db_alice")
+    conn.close()
+
+
+async def test_sasl_connect_with_wrong_password(sasl_bootstrap_servers):
+    """Verify that wrong credentials are rejected with 
AUTHENTICATE_EXCEPTION."""
+    config = fluss.Config({
+        "bootstrap.servers": sasl_bootstrap_servers,
+        "security.protocol": "sasl",
+        "security.sasl.mechanism": "PLAIN",
+        "security.sasl.username": "admin",
+        "security.sasl.password": "wrong-password",
+    })
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await fluss.FlussConnection.create(config)
+
+    assert exc_info.value.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION
+
+
+async def test_sasl_connect_with_unknown_user(sasl_bootstrap_servers):
+    """Verify that a nonexistent user is rejected with 
AUTHENTICATE_EXCEPTION."""
+    config = fluss.Config({
+        "bootstrap.servers": sasl_bootstrap_servers,
+        "security.protocol": "sasl",
+        "security.sasl.mechanism": "PLAIN",
+        "security.sasl.username": "nonexistent_user",
+        "security.sasl.password": "some-password",
+    })
+    with pytest.raises(fluss.FlussError) as exc_info:
+        await fluss.FlussConnection.create(config)
+
+    assert exc_info.value.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION
+
+
+async def test_sasl_client_to_plaintext_server(plaintext_bootstrap_servers):
+    """Verify that a SASL-configured client fails when connecting to a 
plaintext server."""
+    config = fluss.Config({
+        "bootstrap.servers": plaintext_bootstrap_servers,
+        "security.protocol": "sasl",
+        "security.sasl.mechanism": "PLAIN",
+        "security.sasl.username": "admin",
+        "security.sasl.password": "admin-secret",
+    })
+    with pytest.raises(fluss.FlussError):
+        await fluss.FlussConnection.create(config)
diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index a17e57f..703b588 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -23,6 +23,7 @@ use crate::config::Config;
 use crate::rpc::RpcClient;
 use parking_lot::RwLock;
 use std::sync::Arc;
+use std::time::Duration;
 
 use crate::error::{Error, FlussError, Result};
 use crate::metadata::TablePath;
@@ -36,7 +37,22 @@ pub struct FlussConnection {
 
 impl FlussConnection {
     pub async fn new(arg: Config) -> Result<Self> {
-        let connections = Arc::new(RpcClient::new());
+        arg.validate_security()
+            .map_err(|msg| Error::IllegalArgument { message: msg })?;
+
+        let timeout = Duration::from_millis(arg.connect_timeout_ms);
+        let connections = if arg.is_sasl_enabled() {
+            Arc::new(
+                RpcClient::new()
+                    .with_sasl(
+                        arg.security_sasl_username.clone(),
+                        arg.security_sasl_password.clone(),
+                    )
+                    .with_timeout(timeout),
+            )
+        } else {
+            Arc::new(RpcClient::new().with_timeout(timeout))
+        };
         let metadata = Metadata::new(arg.bootstrap_servers.as_str(), 
connections.clone()).await?;
 
         Ok(FlussConnection {
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index a0d7e70..438c948 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -30,6 +30,9 @@ const DEFAULT_MAX_POLL_RECORDS: usize = 500;
 const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
 
 const DEFAULT_ACKS: &str = "all";
+const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
+const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
+const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
 
 /// Bucket assigner strategy for tables without bucket keys.
 /// Matches Java `client.writer.bucket.no-key-assigner`.
@@ -51,7 +54,7 @@ impl fmt::Display for NoKeyAssigner {
     }
 }
 
-#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
+#[derive(Parser, Clone, Deserialize, Serialize)]
 #[command(author, version, about, long_about = None)]
 pub struct Config {
     #[arg(long, default_value_t = String::from(DEFAULT_BOOTSTRAP_SERVER))]
@@ -96,6 +99,58 @@ pub struct Config {
     /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
     #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
     pub writer_batch_timeout_ms: i64,
+
+    /// Connect timeout in milliseconds for TCP transport connect.
+    /// Default: 120000 (120 seconds).
+    #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
+    pub connect_timeout_ms: u64,
+
+    #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
+    pub security_protocol: String,
+
+    #[arg(long, default_value_t = String::from(DEFAULT_SASL_MECHANISM))]
+    pub security_sasl_mechanism: String,
+
+    #[arg(long, default_value_t = String::new())]
+    pub security_sasl_username: String,
+
+    #[arg(long, default_value_t = String::new())]
+    #[serde(skip_serializing)]
+    pub security_sasl_password: String,
+}
+
+impl std::fmt::Debug for Config {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Config")
+            .field("bootstrap_servers", &self.bootstrap_servers)
+            .field("writer_request_max_size", &self.writer_request_max_size)
+            .field("writer_acks", &self.writer_acks)
+            .field("writer_retries", &self.writer_retries)
+            .field("writer_batch_size", &self.writer_batch_size)
+            .field(
+                "writer_bucket_no_key_assigner",
+                &self.writer_bucket_no_key_assigner,
+            )
+            .field(
+                "scanner_remote_log_prefetch_num",
+                &self.scanner_remote_log_prefetch_num,
+            )
+            .field(
+                "remote_file_download_thread_num",
+                &self.remote_file_download_thread_num,
+            )
+            .field(
+                "scanner_log_max_poll_records",
+                &self.scanner_log_max_poll_records,
+            )
+            .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
+            .field("connect_timeout_ms", &self.connect_timeout_ms)
+            .field("security_protocol", &self.security_protocol)
+            .field("security_sasl_mechanism", &self.security_sasl_mechanism)
+            .field("security_sasl_username", &self.security_sasl_username)
+            .field("security_sasl_password", &"[REDACTED]")
+            .finish()
+    }
 }
 
 impl Default for Config {
@@ -112,6 +167,115 @@ impl Default for Config {
             scanner_remote_log_read_concurrency: 
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
             scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
             writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
+            connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
+            security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
+            security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
+            security_sasl_username: String::new(),
+            security_sasl_password: String::new(),
+        }
+    }
+}
+
+impl Config {
+    /// Returns true when the security protocol indicates SASL authentication
+    /// should be performed. Matches Java's `SaslAuthenticationPlugin` which
+    /// registers as `"sasl"` (case-insensitive).
+    pub fn is_sasl_enabled(&self) -> bool {
+        self.security_protocol.eq_ignore_ascii_case("sasl")
+    }
+
+    /// Validates security configuration. Returns `Ok(())` when the config is
+    /// consistent, or an error message when SASL is enabled but the config is
+    /// incomplete or uses an unsupported mechanism.
+    pub fn validate_security(&self) -> Result<(), String> {
+        if !self.is_sasl_enabled() {
+            return Ok(());
+        }
+        if !self.security_sasl_mechanism.eq_ignore_ascii_case("PLAIN") {
+            return Err(format!(
+                "Unsupported SASL mechanism: '{}'. Only 'PLAIN' is supported.",
+                self.security_sasl_mechanism
+            ));
+        }
+        if self.security_sasl_username.is_empty() {
+            return Err(
+                "security_sasl_username must be set when security_protocol is 
'sasl'".to_string(),
+            );
         }
+        if self.security_sasl_password.is_empty() {
+            return Err(
+                "security_sasl_password must be set when security_protocol is 
'sasl'".to_string(),
+            );
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_default_is_not_sasl() {
+        let config = Config::default();
+        assert!(!config.is_sasl_enabled());
+        assert!(config.validate_security().is_ok());
+    }
+
+    #[test]
+    fn test_sasl_enabled_valid() {
+        let config = Config {
+            security_protocol: "sasl".to_string(),
+            security_sasl_mechanism: "PLAIN".to_string(),
+            security_sasl_username: "admin".to_string(),
+            security_sasl_password: "secret".to_string(),
+            ..Config::default()
+        };
+        assert!(config.is_sasl_enabled());
+        assert!(config.validate_security().is_ok());
+    }
+
+    #[test]
+    fn test_sasl_enabled_case_insensitive() {
+        let config = Config {
+            security_protocol: "SASL".to_string(),
+            security_sasl_username: "admin".to_string(),
+            security_sasl_password: "secret".to_string(),
+            ..Config::default()
+        };
+        assert!(config.is_sasl_enabled());
+        assert!(config.validate_security().is_ok());
+    }
+
+    #[test]
+    fn test_sasl_missing_username() {
+        let config = Config {
+            security_protocol: "sasl".to_string(),
+            security_sasl_password: "secret".to_string(),
+            ..Config::default()
+        };
+        assert!(config.validate_security().is_err());
+    }
+
+    #[test]
+    fn test_sasl_missing_password() {
+        let config = Config {
+            security_protocol: "sasl".to_string(),
+            security_sasl_username: "admin".to_string(),
+            ..Config::default()
+        };
+        assert!(config.validate_security().is_err());
+    }
+
+    #[test]
+    fn test_sasl_unsupported_mechanism() {
+        let config = Config {
+            security_protocol: "sasl".to_string(),
+            security_sasl_mechanism: "SCRAM-SHA-256".to_string(),
+            security_sasl_username: "admin".to_string(),
+            security_sasl_password: "secret".to_string(),
+            ..Config::default()
+        };
+        assert!(config.validate_security().is_err());
     }
 }
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index eca4cf3..1c7ee7e 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -408,4 +408,13 @@ message DropPartitionRequest {
   required bool ignore_if_not_exists = 3;
 }
 
-message DropPartitionResponse {}
\ No newline at end of file
+message DropPartitionResponse {}
+
+message AuthenticateRequest {
+  required string protocol = 1;
+  required bytes token = 2;
+}
+
+message AuthenticateResponse {
+  optional bytes challenge = 1;
+}
\ No newline at end of file
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index f6009c0..4231fb0 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -40,6 +40,7 @@ pub enum ApiKey {
     GetLatestLakeSnapshot,
     CreatePartition,
     DropPartition,
+    Authenticate,
     Unknown(i16),
 }
 
@@ -67,6 +68,7 @@ impl From<i16> for ApiKey {
             1035 => ApiKey::GetDatabaseInfo,
             1036 => ApiKey::CreatePartition,
             1037 => ApiKey::DropPartition,
+            1038 => ApiKey::Authenticate,
             _ => Unknown(key),
         }
     }
@@ -96,6 +98,7 @@ impl From<ApiKey> for i16 {
             ApiKey::GetDatabaseInfo => 1035,
             ApiKey::CreatePartition => 1036,
             ApiKey::DropPartition => 1037,
+            ApiKey::Authenticate => 1038,
             Unknown(x) => x,
         }
     }
@@ -129,6 +132,7 @@ mod tests {
             (1035, ApiKey::GetDatabaseInfo),
             (1036, ApiKey::CreatePartition),
             (1037, ApiKey::DropPartition),
+            (1038, ApiKey::Authenticate),
         ];
 
         for (raw, key) in cases {
diff --git a/crates/fluss/src/rpc/message/authenticate.rs 
b/crates/fluss/src/rpc/message/authenticate.rs
new file mode 100644
index 0000000..1292cdc
--- /dev/null
+++ b/crates/fluss/src/rpc/message/authenticate.rs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::{AuthenticateRequest as ProtoAuthenticateRequest, 
AuthenticateResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug, Clone)]
+pub struct AuthenticateRequest {
+    pub inner_request: ProtoAuthenticateRequest,
+}
+
+impl AuthenticateRequest {
+    /// Build a SASL/PLAIN authenticate request.
+    /// Token format: `\0<username>\0<password>` (NUL-separated UTF-8).
+    pub fn new_plain(username: &str, password: &str) -> Self {
+        let mut token = Vec::with_capacity(1 + username.len() + 1 + 
password.len());
+        token.push(0u8);
+        token.extend_from_slice(username.as_bytes());
+        token.push(0u8);
+        token.extend_from_slice(password.as_bytes());
+
+        Self {
+            inner_request: ProtoAuthenticateRequest {
+                protocol: "PLAIN".to_string(),
+                token,
+            },
+        }
+    }
+
+    /// Build an authenticate request from a server challenge (for multi-round 
auth).
+    pub fn from_challenge(protocol: &str, challenge: Vec<u8>) -> Self {
+        Self {
+            inner_request: ProtoAuthenticateRequest {
+                protocol: protocol.to_string(),
+                token: challenge,
+            },
+        }
+    }
+}
+
+impl RequestBody for AuthenticateRequest {
+    type ResponseBody = AuthenticateResponse;
+    const API_KEY: ApiKey = ApiKey::Authenticate;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(AuthenticateRequest);
+impl_read_version_type!(AuthenticateResponse);
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_new_plain_token_format() {
+        let req = AuthenticateRequest::new_plain("admin", "secret");
+        assert_eq!(req.inner_request.protocol, "PLAIN");
+        assert_eq!(req.inner_request.token, b"\0admin\0secret");
+    }
+
+    #[test]
+    fn test_new_plain_empty_credentials() {
+        let req = AuthenticateRequest::new_plain("", "");
+        assert_eq!(req.inner_request.token, b"\0\0");
+    }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index addb97a..9ad4545 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -20,6 +20,7 @@ use crate::rpc::api_version::ApiVersion;
 use crate::rpc::frame::{ReadError, WriteError};
 use bytes::{Buf, BufMut};
 
+mod authenticate;
 mod create_database;
 mod create_partition;
 mod create_table;
@@ -44,6 +45,7 @@ mod table_exists;
 mod update_metadata;
 
 pub use crate::rpc::RpcError;
+pub use authenticate::*;
 pub use create_database::*;
 pub use create_partition::*;
 pub use create_table::*;
diff --git a/crates/fluss/src/rpc/server_connection.rs 
b/crates/fluss/src/rpc/server_connection.rs
index a345c2f..13c5d9c 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -29,6 +29,7 @@ use futures::future::BoxFuture;
 use log::warn;
 use parking_lot::{Mutex, RwLock};
 use std::collections::HashMap;
+use std::fmt;
 use std::io::Cursor;
 use std::ops::DerefMut;
 use std::sync::Arc;
@@ -44,12 +45,34 @@ pub type MessengerTransport = 
ServerConnectionInner<BufStream<Transport>>;
 
 pub type ServerConnection = Arc<MessengerTransport>;
 
+// Matches Java's ExponentialBackoff(100ms initial, 2x multiplier, 5000ms max, 
0.2 jitter).
+const AUTH_INITIAL_BACKOFF_MS: f64 = 100.0;
+const AUTH_MAX_BACKOFF_MS: f64 = 5000.0;
+const AUTH_BACKOFF_MULTIPLIER: f64 = 2.0;
+const AUTH_JITTER: f64 = 0.2;
+
+#[derive(Clone)]
+pub struct SaslConfig {
+    pub username: String,
+    pub password: String,
+}
+
+impl fmt::Debug for SaslConfig {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SaslConfig")
+            .field("username", &self.username)
+            .field("password", &"[REDACTED]")
+            .finish()
+    }
+}
+
 #[derive(Debug, Default)]
 pub struct RpcClient {
     connections: RwLock<HashMap<String, ServerConnection>>,
     client_id: Arc<str>,
     timeout: Option<Duration>,
     max_message_size: usize,
+    sasl_config: Option<SaslConfig>,
 }
 
 impl RpcClient {
@@ -59,13 +82,24 @@ impl RpcClient {
             client_id: Arc::from(""),
             timeout: None,
             max_message_size: usize::MAX,
+            sasl_config: None,
         }
     }
 
+    pub fn with_timeout(mut self, timeout: Duration) -> Self {
+        self.timeout = Some(timeout);
+        self
+    }
+
+    pub fn with_sasl(mut self, username: String, password: String) -> Self {
+        self.sasl_config = Some(SaslConfig { username, password });
+        self
+    }
+
     pub async fn get_connection(
         &self,
         server_node: &ServerNode,
-    ) -> Result<ServerConnection, RpcError> {
+    ) -> Result<ServerConnection, Error> {
         let server_id = server_node.uid();
         {
             let connections = self.connections.read();
@@ -89,7 +123,7 @@ impl RpcClient {
         Ok(new_server)
     }
 
-    async fn connect(&self, server_node: &ServerNode) -> 
Result<ServerConnection, RpcError> {
+    async fn connect(&self, server_node: &ServerNode) -> 
Result<ServerConnection, Error> {
         let url = server_node.url();
         let transport = Transport::connect(&url, self.timeout)
             .await
@@ -100,7 +134,74 @@ impl RpcClient {
             self.max_message_size,
             self.client_id.clone(),
         );
-        Ok(ServerConnection::new(messenger))
+        let connection = ServerConnection::new(messenger);
+
+        if let Some(ref sasl) = self.sasl_config {
+            Self::authenticate(&connection, &sasl.username, 
&sasl.password).await?;
+        }
+
+        Ok(connection)
+    }
+
+    /// Perform SASL/PLAIN authentication handshake.
+    ///
+    /// Retries on `RetriableAuthenticateException` with exponential backoff
+    /// (matching Java's unbounded retry behaviour). Non-retriable errors
+    /// (wrong password, unknown user) propagate immediately as
+    /// `Error::FlussAPIError` with the original error code.
+    async fn authenticate(
+        connection: &ServerConnection,
+        username: &str,
+        password: &str,
+    ) -> Result<(), Error> {
+        use crate::rpc::fluss_api_error::FlussError;
+        use crate::rpc::message::AuthenticateRequest;
+        use rand::Rng;
+
+        let initial_request = AuthenticateRequest::new_plain(username, 
password);
+        let mut retry_count: u32 = 0;
+
+        loop {
+            let request = initial_request.clone();
+            let result = connection.request(request).await;
+
+            match result {
+                Ok(response) => {
+                    // Check for server challenge (multi-round auth).
+                    // PLAIN mechanism never sends a challenge, but we handle 
it
+                    // for protocol correctness matching Java's 
handleAuthenticateResponse.
+                    if let Some(challenge) = response.challenge {
+                        let challenge_req = 
AuthenticateRequest::from_challenge("PLAIN", challenge);
+                        connection.request(challenge_req).await?;
+                    }
+                    return Ok(());
+                }
+                Err(Error::FlussAPIError { ref api_error })
+                    if FlussError::for_code(api_error.code)
+                        == FlussError::RetriableAuthenticateException =>
+                {
+                    retry_count += 1;
+                    // Cap the exponent like Java's ExponentialBackoff.expMax 
so that
+                    // jitter still produces a range at steady state instead 
of being
+                    // clamped to AUTH_MAX_BACKOFF_MS.
+                    let exp_max = (AUTH_MAX_BACKOFF_MS / 
AUTH_INITIAL_BACKOFF_MS).log2();
+                    let exp = ((retry_count as f64) - 1.0).min(exp_max);
+                    let term = AUTH_INITIAL_BACKOFF_MS * 
AUTH_BACKOFF_MULTIPLIER.powf(exp);
+                    let jitter_factor =
+                        1.0 - AUTH_JITTER + rand::rng().random::<f64>() * (2.0 
* AUTH_JITTER);
+                    let backoff_ms = (term * jitter_factor) as u64;
+                    log::warn!(
+                        "SASL authentication retriable failure (attempt 
{retry_count}), \
+                         retrying in {backoff_ms}ms: {}",
+                        api_error.message
+                    );
+                    
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
+                }
+                // Server-side auth errors (wrong password, unknown user, etc.)
+                // propagate with their original error code preserved.
+                Err(e) => return Err(e),
+            }
+        }
     }
 }
 
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index 3502923..5bbdaf0 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -15,49 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::integration::fluss_cluster::FlussTestingCluster;
-use parking_lot::RwLock;
-
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
-    LazyLock::new(|| Arc::new(RwLock::new(None)));
-
 #[cfg(test)]
-#[before_all]
-#[after_all]
 mod admin_test {
-    use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::FlussTestingCluster;
-    use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
+    use crate::integration::utils::get_shared_cluster;
     use fluss::error::FlussError;
     use fluss::metadata::{
         DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, 
PartitionSpec, Schema,
         TableDescriptor, TablePath,
     };
     use std::collections::HashMap;
-    use std::sync::Arc;
-
-    fn before_all() {
-        start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone());
-    }
-
-    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        get_cluster(&SHARED_FLUSS_CLUSTER)
-    }
-
-    fn after_all() {
-        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
-    }
 
     #[tokio::test]
     async fn test_create_database() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("should get admin");
@@ -97,13 +67,11 @@ mod admin_test {
 
         // database shouldn't exist now
         assert!(!admin.database_exists(db_name).await.unwrap());
-
-        // Note: We don't stop the shared cluster here as it's used by other 
tests
     }
 
     #[tokio::test]
     async fn test_create_table() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection
             .get_admin()
@@ -232,7 +200,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_partition_apis() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection
             .get_admin()
@@ -371,7 +339,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_fluss_error_response() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection
             .get_admin()
@@ -405,7 +373,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_error_database_not_exist() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.unwrap();
 
@@ -424,7 +392,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_error_database_already_exist() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.unwrap();
 
@@ -454,7 +422,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_error_table_already_exist() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.unwrap();
 
@@ -502,7 +470,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_error_table_not_exist() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.unwrap();
 
@@ -564,7 +532,7 @@ mod admin_test {
 
     #[tokio::test]
     async fn test_error_table_not_partitioned() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.unwrap();
 
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs 
b/crates/fluss/tests/integration/fluss_cluster.rs
index e4dcad9..a2e9157 100644
--- a/crates/fluss/tests/integration/fluss_cluster.rs
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -25,7 +25,8 @@ use testcontainers::core::ContainerPort;
 use testcontainers::runners::AsyncRunner;
 use testcontainers::{ContainerAsync, GenericImage, ImageExt};
 
-const FLUSS_VERSION: &str = "0.7.0";
+const FLUSS_VERSION: &str = "0.8.0-incubating";
+const FLUSS_IMAGE: &str = "apache/fluss";
 
 pub struct FlussTestingClusterBuilder {
     number_of_tablet_servers: i32,
@@ -33,9 +34,20 @@ pub struct FlussTestingClusterBuilder {
     cluster_conf: HashMap<String, String>,
     testing_name: String,
     remote_data_dir: Option<std::path::PathBuf>,
+    sasl_enabled: bool,
+    sasl_users: Vec<(String, String)>,
+    /// Host port for the coordinator server (default 9123).
+    coordinator_host_port: u16,
+    /// Host port for the plaintext (PLAIN_CLIENT) listener.
+    /// When set together with `sasl_enabled`, the cluster exposes two 
listeners:
+    /// CLIENT (SASL) on `coordinator_host_port` and PLAIN_CLIENT on this port.
+    plain_client_port: Option<u16>,
+    image: String,
+    image_tag: String,
 }
 
 impl FlussTestingClusterBuilder {
+    #[allow(dead_code)]
     pub fn new(testing_name: impl Into<String>) -> Self {
         Self::new_with_cluster_conf(testing_name.into(), &HashMap::default())
     }
@@ -47,6 +59,18 @@ impl FlussTestingClusterBuilder {
         self
     }
 
+    /// Enable SASL/PLAIN authentication on the cluster with dual listeners.
+    /// Users are specified as `(username, password)` pairs.
+    /// This automatically configures a PLAIN_CLIENT (plaintext) listener in 
addition
+    /// to the CLIENT (SASL) listener, allowing both authenticated and 
unauthenticated
+    /// connections on the same cluster.
+    pub fn with_sasl(mut self, users: Vec<(String, String)>) -> Self {
+        self.sasl_enabled = true;
+        self.sasl_users = users;
+        self.plain_client_port = Some(self.coordinator_host_port + 100);
+        self
+    }
+
     pub fn new_with_cluster_conf(
         testing_name: impl Into<String>,
         conf: &HashMap<String, String>,
@@ -68,6 +92,12 @@ impl FlussTestingClusterBuilder {
             network: "fluss-cluster-network",
             testing_name: testing_name.into(),
             remote_data_dir: None,
+            sasl_enabled: false,
+            sasl_users: Vec::new(),
+            coordinator_host_port: 9123,
+            plain_client_port: None,
+            image: FLUSS_IMAGE.to_string(),
+            image_tag: FLUSS_VERSION.to_string(),
         }
     }
 
@@ -84,6 +114,43 @@ impl FlussTestingClusterBuilder {
     }
 
     pub async fn build(&mut self) -> FlussTestingCluster {
+        // Remove stale containers from previous runs (if any) so we can reuse 
names.
+        let stale_containers: Vec<String> = 
std::iter::once(self.zookeeper_container_name())
+            .chain(std::iter::once(self.coordinator_server_container_name()))
+            .chain(
+                (0..self.number_of_tablet_servers).map(|id| 
self.tablet_server_container_name(id)),
+            )
+            .collect();
+        for name in &stale_containers {
+            let _ = std::process::Command::new("docker")
+                .args(["rm", "-f", name])
+                .output();
+        }
+
+        // Inject SASL server-side configuration into cluster_conf
+        if self.sasl_enabled && !self.sasl_users.is_empty() {
+            self.cluster_conf.insert(
+                "security.protocol.map".to_string(),
+                "CLIENT:sasl".to_string(),
+            );
+            self.cluster_conf.insert(
+                "security.sasl.enabled.mechanisms".to_string(),
+                "plain".to_string(),
+            );
+            // Build JAAS config: user_<name>="<password>" for each user
+            let user_entries: Vec<String> = self
+                .sasl_users
+                .iter()
+                .map(|(u, p)| format!("user_{}=\"{}\"", u, p))
+                .collect();
+            let jaas_config = format!(
+                "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule 
required {};",
+                user_entries.join(" ")
+            );
+            self.cluster_conf
+                .insert("security.sasl.plain.jaas.config".to_string(), 
jaas_config);
+        }
+
         let zookeeper = Arc::new(
             GenericImage::new("zookeeper", "3.9.2")
                 .with_network(self.network)
@@ -103,64 +170,122 @@ impl FlussTestingClusterBuilder {
             );
         }
 
+        // When dual listeners are configured, bootstrap_servers points to the 
plaintext
+        // listener and sasl_bootstrap_servers points to the SASL listener.
+        let (bootstrap_servers, sasl_bootstrap_servers) =
+            if let Some(plain_port) = self.plain_client_port {
+                (
+                    format!("127.0.0.1:{}", plain_port),
+                    Some(format!("127.0.0.1:{}", self.coordinator_host_port)),
+                )
+            } else {
+                (format!("127.0.0.1:{}", self.coordinator_host_port), None)
+            };
+
         FlussTestingCluster {
             zookeeper,
             coordinator_server,
             tablet_servers,
-            bootstrap_servers: "127.0.0.1:9123".to_string(),
+            bootstrap_servers,
+            sasl_bootstrap_servers,
             remote_data_dir: self.remote_data_dir.clone(),
+            sasl_users: self.sasl_users.clone(),
+            container_names: stale_containers,
         }
     }
 
     async fn start_coordinator_server(&mut self) -> 
ContainerAsync<GenericImage> {
+        let port = self.coordinator_host_port;
+        let container_name = self.coordinator_server_container_name();
         let mut coordinator_confs = HashMap::new();
         coordinator_confs.insert(
             "zookeeper.address",
             format!("{}:2181", self.zookeeper_container_name()),
         );
-        coordinator_confs.insert(
-            "bind.listeners",
-            format!(
-                "INTERNAL://{}:0, CLIENT://{}:9123",
-                self.coordinator_server_container_name(),
-                self.coordinator_server_container_name()
-            ),
-        );
-        coordinator_confs.insert(
-            "advertised.listeners",
-            "CLIENT://localhost:9123".to_string(),
-        );
+
+        if let Some(plain_port) = self.plain_client_port {
+            // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext)
+            coordinator_confs.insert(
+                "bind.listeners",
+                format!(
+                    "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}",
+                    container_name, container_name, port, container_name, 
plain_port
+                ),
+            );
+            coordinator_confs.insert(
+                "advertised.listeners",
+                format!(
+                    "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}",
+                    port, plain_port
+                ),
+            );
+        } else {
+            coordinator_confs.insert(
+                "bind.listeners",
+                format!(
+                    "INTERNAL://{}:0, CLIENT://{}:{}",
+                    container_name, container_name, port
+                ),
+            );
+            coordinator_confs.insert(
+                "advertised.listeners",
+                format!("CLIENT://localhost:{}", port),
+            );
+        }
+
         coordinator_confs.insert("internal.listener.name", 
"INTERNAL".to_string());
-        GenericImage::new("fluss/fluss", FLUSS_VERSION)
+
+        let mut image = GenericImage::new(&self.image, &self.image_tag)
             .with_container_name(self.coordinator_server_container_name())
-            .with_mapped_port(9123, ContainerPort::Tcp(9123))
+            .with_mapped_port(port, ContainerPort::Tcp(port))
             .with_network(self.network)
             .with_cmd(vec!["coordinatorServer"])
             .with_env_var(
                 "FLUSS_PROPERTIES",
                 self.to_fluss_properties_with(coordinator_confs),
-            )
-            .start()
-            .await
-            .unwrap()
+            );
+
+        if let Some(plain_port) = self.plain_client_port {
+            image = image.with_mapped_port(plain_port, 
ContainerPort::Tcp(plain_port));
+        }
+
+        image.start().await.unwrap()
     }
 
     async fn start_tablet_server(&self, server_id: i32) -> 
ContainerAsync<GenericImage> {
+        let port = self.coordinator_host_port;
+        let container_name = self.tablet_server_container_name(server_id);
         let mut tablet_server_confs = HashMap::new();
-        let bind_listeners = format!(
-            "INTERNAL://{}:0, CLIENT://{}:9123",
-            self.tablet_server_container_name(server_id),
-            self.tablet_server_container_name(server_id),
-        );
-        let expose_host_port = 9124 + server_id;
-        let advertised_listeners = format!("CLIENT://localhost:{}", 
expose_host_port);
+        let expose_host_port = (port as i32) + 1 + server_id;
         let tablet_server_id = format!("{}", server_id);
+
+        if let Some(plain_port) = self.plain_client_port {
+            // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext)
+            let bind_listeners = format!(
+                "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}",
+                container_name, container_name, port, container_name, 
plain_port,
+            );
+            let plain_expose_host_port = (plain_port as i32) + 1 + server_id;
+            let advertised_listeners = format!(
+                "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}",
+                expose_host_port, plain_expose_host_port
+            );
+            tablet_server_confs.insert("bind.listeners", bind_listeners);
+            tablet_server_confs.insert("advertised.listeners", 
advertised_listeners);
+        } else {
+            let bind_listeners = format!(
+                "INTERNAL://{}:0, CLIENT://{}:{}",
+                container_name, container_name, port,
+            );
+            let advertised_listeners = format!("CLIENT://localhost:{}", 
expose_host_port);
+            tablet_server_confs.insert("bind.listeners", bind_listeners);
+            tablet_server_confs.insert("advertised.listeners", 
advertised_listeners);
+        }
+
         tablet_server_confs.insert(
             "zookeeper.address",
             format!("{}:2181", self.zookeeper_container_name()),
         );
-        tablet_server_confs.insert("bind.listeners", bind_listeners);
-        tablet_server_confs.insert("advertised.listeners", 
advertised_listeners);
         tablet_server_confs.insert("internal.listener.name", 
"INTERNAL".to_string());
         tablet_server_confs.insert("tablet-server.id", tablet_server_id);
 
@@ -172,9 +297,9 @@ impl FlussTestingClusterBuilder {
                 remote_data_dir.to_string_lossy().to_string(),
             );
         }
-        let mut image = GenericImage::new("fluss/fluss", FLUSS_VERSION)
+        let mut image = GenericImage::new(&self.image, &self.image_tag)
             .with_cmd(vec!["tabletServer"])
-            .with_mapped_port(expose_host_port as u16, 
ContainerPort::Tcp(9123))
+            .with_mapped_port(expose_host_port as u16, 
ContainerPort::Tcp(port))
             .with_network(self.network)
             .with_container_name(self.tablet_server_container_name(server_id))
             .with_env_var(
@@ -182,6 +307,15 @@ impl FlussTestingClusterBuilder {
                 self.to_fluss_properties_with(tablet_server_confs),
             );
 
+        // Add port mapping for plaintext listener
+        if let Some(plain_port) = self.plain_client_port {
+            let plain_expose_host_port = (plain_port as i32) + 1 + server_id;
+            image = image.with_mapped_port(
+                plain_expose_host_port as u16,
+                ContainerPort::Tcp(plain_port),
+            );
+        }
+
         // Add volume mount if remote_data_dir is provided
         if let Some(ref remote_data_dir) = self.remote_data_dir {
             use testcontainers::core::Mount;
@@ -210,35 +344,45 @@ impl FlussTestingClusterBuilder {
 
 /// Provides an easy way to launch a Fluss cluster with coordinator and tablet 
servers.
 #[derive(Clone)]
+#[allow(dead_code)] // Fields held for RAII (keeping Docker containers alive).
 pub struct FlussTestingCluster {
     zookeeper: Arc<ContainerAsync<GenericImage>>,
     coordinator_server: Arc<ContainerAsync<GenericImage>>,
     tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
+    /// Bootstrap servers for plaintext connections.
+    /// When dual listeners are configured, this points to the PLAIN_CLIENT 
listener.
     bootstrap_servers: String,
+    /// Bootstrap servers for SASL connections (only set when dual listeners 
are configured).
+    sasl_bootstrap_servers: Option<String>,
     remote_data_dir: Option<std::path::PathBuf>,
+    sasl_users: Vec<(String, String)>,
+    container_names: Vec<String>,
 }
 
 impl FlussTestingCluster {
-    pub async fn stop(&self) {
-        for tablet_server in self.tablet_servers.values() {
-            tablet_server.stop().await.unwrap()
+    /// Synchronously stops and removes all Docker containers and cleans up the
+    /// remote data directory. Safe to call from non-async contexts (e.g. 
atexit).
+    #[allow(dead_code)]
+    pub fn stop(&self) {
+        for name in &self.container_names {
+            let _ = std::process::Command::new("docker")
+                .args(["rm", "-f", name])
+                .output();
         }
-        self.coordinator_server.stop().await.unwrap();
-        self.zookeeper.stop().await.unwrap();
-        if let Some(remote_data_dir) = &self.remote_data_dir {
-            // Try to clean up the remote data directory, but don't fail if it 
can't be deleted.
-            // This can happen in CI environments or if Docker containers are 
still using the directory.
-            // The directory will be cleaned up by the CI system or OS 
eventually.
-            if let Err(e) = tokio::fs::remove_dir_all(remote_data_dir).await {
-                eprintln!(
-                    "Warning: Failed to delete remote data directory: {:?}, 
error: {:?}. \
-                     This is non-fatal and the directory may be cleaned up 
later.",
-                    remote_data_dir, e
-                );
-            }
+        if let Some(ref dir) = self.remote_data_dir {
+            let _ = std::fs::remove_dir_all(dir);
         }
     }
 
+    pub fn sasl_users(&self) -> &[(String, String)] {
+        &self.sasl_users
+    }
+
+    /// Returns the plaintext (non-SASL) bootstrap servers address.
+    pub fn plaintext_bootstrap_servers(&self) -> &str {
+        &self.bootstrap_servers
+    }
+
     pub async fn get_fluss_connection(&self) -> FlussConnection {
         let config = Config {
             writer_acks: "all".to_string(),
@@ -246,6 +390,58 @@ impl FlussTestingCluster {
             ..Default::default()
         };
 
+        self.connect_with_retry(config).await
+    }
+
+    /// Connect with SASL/PLAIN credentials.
+    /// Uses `sasl_bootstrap_servers` when dual listeners are configured.
+    pub async fn get_fluss_connection_with_sasl(
+        &self,
+        username: &str,
+        password: &str,
+    ) -> FlussConnection {
+        let bootstrap = self
+            .sasl_bootstrap_servers
+            .clone()
+            .unwrap_or_else(|| self.bootstrap_servers.clone());
+        let config = Config {
+            writer_acks: "all".to_string(),
+            bootstrap_servers: bootstrap,
+            security_protocol: "sasl".to_string(),
+            security_sasl_mechanism: "PLAIN".to_string(),
+            security_sasl_username: username.to_string(),
+            security_sasl_password: password.to_string(),
+            ..Default::default()
+        };
+
+        self.connect_with_retry(config).await
+    }
+
+    /// Try to connect with SASL/PLAIN credentials, returning the error on 
failure.
+    /// Uses `sasl_bootstrap_servers` when dual listeners are configured.
+    pub async fn try_fluss_connection_with_sasl(
+        &self,
+        username: &str,
+        password: &str,
+    ) -> fluss::error::Result<FlussConnection> {
+        let bootstrap = self
+            .sasl_bootstrap_servers
+            .clone()
+            .unwrap_or_else(|| self.bootstrap_servers.clone());
+        let config = Config {
+            writer_acks: "all".to_string(),
+            bootstrap_servers: bootstrap,
+            security_protocol: "sasl".to_string(),
+            security_sasl_mechanism: "PLAIN".to_string(),
+            security_sasl_username: username.to_string(),
+            security_sasl_password: password.to_string(),
+            ..Default::default()
+        };
+
+        FlussConnection::new(config).await
+    }
+
+    async fn connect_with_retry(&self, config: Config) -> FlussConnection {
         // Retry mechanism: retry for up to 1 minute
         let max_retries = 60; // 60 retry attempts
         let retry_interval = Duration::from_secs(1); // 1 second interval 
between retries
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index c101a18..f0e0c57 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -16,42 +16,11 @@
  * limitations under the License.
  */
 
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-use crate::integration::fluss_cluster::FlussTestingCluster;
 #[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
-    LazyLock::new(|| Arc::new(RwLock::new(None)));
-
-#[cfg(test)]
-#[before_all]
-#[after_all]
 mod kv_table_test {
-    use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::FlussTestingCluster;
-    use crate::integration::utils::{
-        create_partitions, create_table, get_cluster, start_cluster, 
stop_cluster,
-    };
+    use crate::integration::utils::{create_partitions, create_table, 
get_shared_cluster};
     use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::row::{GenericRow, InternalRow};
-    use std::sync::Arc;
-
-    fn before_all() {
-        start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone());
-    }
-
-    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        get_cluster(&SHARED_FLUSS_CLUSTER)
-    }
-
-    fn after_all() {
-        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
-    }
 
     fn make_key(id: i32) -> GenericRow<'static> {
         let mut row = GenericRow::new(3);
@@ -61,7 +30,7 @@ mod kv_table_test {
 
     #[tokio::test]
     async fn upsert_delete_and_lookup() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.unwrap();
@@ -200,7 +169,7 @@ mod kv_table_test {
 
     #[tokio::test]
     async fn composite_primary_keys() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.unwrap();
@@ -310,7 +279,7 @@ mod kv_table_test {
     async fn partial_update() {
         use fluss::row::Datum;
 
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -431,7 +400,7 @@ mod kv_table_test {
 
     #[tokio::test]
     async fn partitioned_table_upsert_and_lookup() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -601,7 +570,7 @@ mod kv_table_test {
     async fn all_supported_datatypes() {
         use fluss::row::{Date, Datum, Decimal, Time, TimestampLtz, 
TimestampNtz};
 
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 779ffdd..4aa88ac 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -16,55 +16,21 @@
  * limitations under the License.
  */
 
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-use crate::integration::fluss_cluster::FlussTestingCluster;
-#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
-    LazyLock::new(|| Arc::new(RwLock::new(None)));
-
 #[cfg(test)]
-#[before_all]
-#[after_all]
 mod table_test {
-    use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::FlussTestingCluster;
-    use crate::integration::utils::{
-        create_partitions, create_table, get_cluster, start_cluster, 
stop_cluster,
-    };
+    use crate::integration::utils::{create_partitions, create_table, 
get_shared_cluster};
     use arrow::array::record_batch;
     use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan};
     use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::record::ScanRecord;
     use fluss::row::InternalRow;
     use fluss::rpc::message::OffsetSpec;
-    use jiff::Timestamp;
     use std::collections::HashMap;
-    use std::sync::Arc;
-    use std::sync::atomic::AtomicUsize;
-    use std::sync::atomic::Ordering;
     use std::time::Duration;
 
-    fn before_all() {
-        start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone());
-    }
-
-    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        get_cluster(&SHARED_FLUSS_CLUSTER)
-    }
-
-    fn after_all() {
-        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
-    }
-
     #[tokio::test]
     async fn append_record_batch_and_scan() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -174,7 +140,7 @@ mod table_test {
 
     #[tokio::test]
     async fn list_offsets() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -221,8 +187,6 @@ mod table_test {
             "Latest offset should be 0 for empty table"
         );
 
-        let before_append_ms = Timestamp::now().as_millisecond();
-
         // Append some records
         let append_writer = connection
             .get_table(&table_path)
@@ -247,8 +211,6 @@ mod table_test {
 
         tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 
-        let after_append_ms = Timestamp::now().as_millisecond();
-
         // Test latest offset after appending (should be 3)
         let latest_offsets_after = admin
             .list_offsets(&table_path, &[0], OffsetSpec::Latest)
@@ -273,34 +235,65 @@ mod table_test {
             "Earliest offset should still be 0"
         );
 
-        // Test list_offsets_by_timestamp
+        // Scan records back to get server-assigned timestamps (avoids 
host/container
+        // clock skew issues that make host-based timestamps unreliable).
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+        let log_scanner = table
+            .new_scan()
+            .create_log_scanner()
+            .expect("Failed to create log scanner");
+        log_scanner
+            .subscribe(0, EARLIEST_OFFSET)
+            .await
+            .expect("Failed to subscribe");
+
+        let mut record_timestamps: Vec<i64> = Vec::new();
+        let scan_start = std::time::Instant::now();
+        while record_timestamps.len() < 3 && scan_start.elapsed() < 
Duration::from_secs(10) {
+            let scan_records = log_scanner
+                .poll(Duration::from_millis(500))
+                .await
+                .expect("Failed to poll records");
+            for rec in scan_records {
+                record_timestamps.push(rec.timestamp());
+            }
+        }
+        assert_eq!(record_timestamps.len(), 3, "Expected 3 record timestamps");
+
+        let min_ts = *record_timestamps.iter().min().unwrap();
+        let max_ts = *record_timestamps.iter().max().unwrap();
 
-        let timestamp_offsets = admin
-            .list_offsets(&table_path, &[0], 
OffsetSpec::Timestamp(before_append_ms))
+        // Timestamp before all records should resolve to offset 0
+        let before_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Timestamp(min_ts - 1))
             .await
-            .expect("Failed to list offsets by timestamp");
+            .expect("Failed to list offsets by timestamp (before)");
 
         assert_eq!(
-            timestamp_offsets.get(&0),
+            before_offsets.get(&0),
             Some(&0),
-            "Timestamp before append should resolve to offset 0 (start of new 
data)"
+            "Timestamp before first record should resolve to offset 0"
         );
 
-        let timestamp_offsets = admin
-            .list_offsets(&table_path, &[0], 
OffsetSpec::Timestamp(after_append_ms))
+        // Timestamp after all records should resolve to offset 3
+        let after_offsets = admin
+            .list_offsets(&table_path, &[0], OffsetSpec::Timestamp(max_ts + 1))
             .await
-            .expect("Failed to list offsets by timestamp");
+            .expect("Failed to list offsets by timestamp (after)");
 
         assert_eq!(
-            timestamp_offsets.get(&0),
+            after_offsets.get(&0),
             Some(&3),
-            "Timestamp after append should resolve to offset 0 (no newer 
records)"
+            "Timestamp after last record should resolve to offset 3"
         );
     }
 
     #[tokio::test]
     async fn test_project() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -456,7 +449,7 @@ mod table_test {
 
     #[tokio::test]
     async fn test_poll_batches() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
@@ -588,7 +581,7 @@ mod table_test {
     async fn all_supported_datatypes() {
         use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, 
TimestampNtz};
 
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -1019,7 +1012,7 @@ mod table_test {
 
     #[tokio::test]
     async fn partitioned_table_append_scan() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -1314,7 +1307,7 @@ mod table_test {
 
     #[tokio::test]
     async fn undersized_row_returns_error() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
diff --git a/crates/fluss/tests/integration/sasl_auth.rs 
b/crates/fluss/tests/integration/sasl_auth.rs
new file mode 100644
index 0000000..878c983
--- /dev/null
+++ b/crates/fluss/tests/integration/sasl_auth.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod sasl_auth_test {
+    use crate::integration::utils::get_shared_cluster;
+    use fluss::client::FlussConnection;
+    use fluss::config::Config;
+    use fluss::error::FlussError;
+    use fluss::metadata::DatabaseDescriptorBuilder;
+
+    const SASL_USERNAME: &str = "admin";
+    const SASL_PASSWORD: &str = "admin-secret";
+
+    /// Verify that a client with correct SASL credentials can connect and 
perform operations.
+    #[tokio::test]
+    async fn test_sasl_connect_with_valid_credentials() {
+        let cluster = get_shared_cluster();
+        let connection = cluster
+            .get_fluss_connection_with_sasl(SASL_USERNAME, SASL_PASSWORD)
+            .await;
+
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Should get admin with valid SASL credentials");
+
+        // Perform a basic operation to confirm the connection is fully 
functional
+        let db_name = "sasl_test_valid_db";
+        let descriptor = DatabaseDescriptorBuilder::default()
+            .comment("created via SASL auth")
+            .build();
+
+        admin
+            .create_database(db_name, Some(&descriptor), true)
+            .await
+            .expect("Should create database with SASL auth");
+
+        assert!(admin.database_exists(db_name).await.unwrap());
+
+        // Cleanup
+        admin
+            .drop_database(db_name, true, true)
+            .await
+            .expect("Should drop database");
+    }
+
+    /// Verify that a second user can also authenticate successfully.
+    #[tokio::test]
+    async fn test_sasl_connect_with_second_user() {
+        let cluster = get_shared_cluster();
+        let connection = cluster
+            .get_fluss_connection_with_sasl("alice", "alice-secret")
+            .await;
+
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Should get admin with alice credentials");
+
+        // Basic operation to confirm functional connection
+        assert!(
+            admin
+                .database_exists("some_nonexistent_db_alice")
+                .await
+                .is_ok()
+        );
+    }
+
+    /// Verify that wrong credentials are rejected with a typed 
AuthenticateException error.
+    #[tokio::test]
+    async fn test_sasl_connect_with_wrong_password() {
+        let cluster = get_shared_cluster();
+        let result = cluster
+            .try_fluss_connection_with_sasl(SASL_USERNAME, "wrong-password")
+            .await;
+
+        let err = match result {
+            Err(e) => e,
+            Ok(_) => panic!("Connection with wrong password should fail"),
+        };
+
+        // The server error code must be preserved (not wrapped in a generic 
string).
+        // Code 46 = AuthenticateException — this is what C++ and Python 
bindings
+        // use to distinguish auth failures from network errors.
+        assert_eq!(
+            err.api_error(),
+            Some(FlussError::AuthenticateException),
+            "Wrong password should produce AuthenticateException, got: {err}"
+        );
+    }
+
+    /// Verify that a SASL-configured client fails when connecting to a 
plaintext server.
+    #[tokio::test]
+    async fn test_sasl_client_to_plaintext_server() {
+        let cluster = get_shared_cluster();
+        let plaintext_addr = cluster.plaintext_bootstrap_servers().to_string();
+
+        let config = Config {
+            writer_acks: "all".to_string(),
+            bootstrap_servers: plaintext_addr,
+            security_protocol: "sasl".to_string(),
+            security_sasl_mechanism: "PLAIN".to_string(),
+            security_sasl_username: SASL_USERNAME.to_string(),
+            security_sasl_password: SASL_PASSWORD.to_string(),
+            ..Default::default()
+        };
+
+        let result = FlussConnection::new(config).await;
+        assert!(
+            result.is_err(),
+            "SASL client connecting to plaintext server should fail"
+        );
+    }
+
+    /// Verify that a nonexistent user is rejected with a typed error.
+    #[tokio::test]
+    async fn test_sasl_connect_with_unknown_user() {
+        let cluster = get_shared_cluster();
+        let result = cluster
+            .try_fluss_connection_with_sasl("nonexistent_user", 
"some-password")
+            .await;
+
+        let err = match result {
+            Err(e) => e,
+            Ok(_) => panic!("Connection with unknown user should fail"),
+        };
+
+        assert_eq!(
+            err.api_error(),
+            Some(FlussError::AuthenticateException),
+            "Unknown user should produce AuthenticateException, got: {err}"
+        );
+    }
+}
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index fcd6773..52b8974 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -15,107 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use crate::integration::fluss_cluster::FlussTestingCluster;
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
 
 #[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
-    LazyLock::new(|| Arc::new(RwLock::new(None)));
-
-#[cfg(test)]
-#[before_all]
-#[after_all]
 mod table_remote_scan_test {
-    use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
-    use crate::integration::utils::{
-        create_table, get_cluster, stop_cluster, wait_for_cluster_ready,
-    };
+    use crate::integration::utils::{create_table, get_shared_cluster};
     use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::row::{GenericRow, InternalRow};
-    use std::collections::HashMap;
-    use std::sync::Arc;
-    use std::sync::atomic::AtomicUsize;
-    use std::thread;
     use std::time::Duration;
-    use uuid::Uuid;
-    fn before_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
-        thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                // Create a temporary directory for remote data that can be 
accessed from both
-                // container and host. Use a fixed path so it's the same in 
container and host.
-                // On macOS, Docker Desktop may have issues with /tmp, so we 
use a path in the
-                // current working directory or user's home directory which 
Docker can access.
-                let temp_dir = std::env::current_dir()
-                    .unwrap_or_else(|_| std::path::PathBuf::from("."))
-                    .join("target")
-                    .join(format!("test-remote-data-{}", Uuid::new_v4()));
-
-                // Remove existing directory if it exists to start fresh
-                let _ = std::fs::remove_dir_all(&temp_dir);
-                std::fs::create_dir_all(&temp_dir)
-                    .expect("Failed to create temporary directory for remote 
data");
-                println!("temp_dir: {:?}", temp_dir);
-
-                // Verify directory was created and is accessible
-                if !temp_dir.exists() {
-                    panic!("Remote data directory was not created: {:?}", 
temp_dir);
-                }
-
-                // Get absolute path for Docker mount
-                let temp_dir = temp_dir
-                    .canonicalize()
-                    .expect("Failed to canonicalize remote data directory 
path");
-
-                let mut cluster_conf = HashMap::new();
-                // set to a small size to make data can be tiered to remote
-                cluster_conf.insert("log.segment.file-size".to_string(), 
"120b".to_string());
-                cluster_conf.insert(
-                    "remote.log.task-interval-duration".to_string(),
-                    "1s".to_string(),
-                );
-                // remote.data.dir uses the same path in container and host
-                cluster_conf.insert(
-                    "remote.data.dir".to_string(),
-                    temp_dir.to_string_lossy().to_string(),
-                );
-
-                let cluster = 
FlussTestingClusterBuilder::new_with_cluster_conf(
-                    "test_table_remote",
-                    &cluster_conf,
-                )
-                .with_remote_data_dir(temp_dir)
-                .build()
-                .await;
-                wait_for_cluster_ready(&cluster).await;
-                let mut guard = cluster_lock.write();
-                *guard = Some(cluster);
-            });
-        })
-        .join()
-        .expect("Failed to create cluster");
-    }
-
-    fn after_all() {
-        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
-    }
 
     #[tokio::test]
     async fn test_scan_remote_log() {
-        let cluster = get_fluss_cluster();
+        let cluster = get_shared_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss", 
"test_append_record_batch_and_scan");
+        let table_path = TablePath::new("fluss", "test_scan_remote_log");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -206,8 +121,4 @@ mod table_remote_scan_test {
             );
         }
     }
-
-    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        get_cluster(&SHARED_FLUSS_CLUSTER)
-    }
 }
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
index ae61d3a..b53abc8 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -18,20 +18,97 @@
 use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
 use fluss::client::FlussAdmin;
 use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
-use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::sync::Arc;
+use std::sync::LazyLock;
 use std::time::Duration;
 
-/// Polls the cluster until CoordinatorEventProcessor is initialized and 
tablet server is available.
-/// Times out after 20 seconds.
-pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
-    let timeout = Duration::from_secs(20);
+extern "C" fn cleanup_on_exit() {
+    SHARED_CLUSTER.stop();
+}
+
+/// Shared cluster with dual listeners: PLAIN_CLIENT (plaintext) on port 9223
+/// and CLIENT (SASL) on port 9123. Includes remote storage config so
+/// table_remote_scan can also use this cluster.
+static SHARED_CLUSTER: LazyLock<FlussTestingCluster> = LazyLock::new(|| {
+    std::thread::spawn(|| {
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+        rt.block_on(async {
+            let temp_dir = std::env::current_dir()
+                .unwrap_or_else(|_| std::path::PathBuf::from("."))
+                .join("target")
+                .join(format!("test-remote-data-{}", uuid::Uuid::new_v4()));
+            let _ = std::fs::remove_dir_all(&temp_dir);
+            std::fs::create_dir_all(&temp_dir)
+                .expect("Failed to create temporary directory for remote 
data");
+            let temp_dir = temp_dir
+                .canonicalize()
+                .expect("Failed to canonicalize remote data directory path");
+
+            let mut cluster_conf = HashMap::new();
+            cluster_conf.insert("log.segment.file-size".to_string(), 
"120b".to_string());
+            cluster_conf.insert(
+                "remote.log.task-interval-duration".to_string(),
+                "1s".to_string(),
+            );
+
+            let cluster =
+                
FlussTestingClusterBuilder::new_with_cluster_conf("shared-test", &cluster_conf)
+                    .with_sasl(vec![
+                        ("admin".to_string(), "admin-secret".to_string()),
+                        ("alice".to_string(), "alice-secret".to_string()),
+                    ])
+                    .with_remote_data_dir(temp_dir)
+                    .build()
+                    .await;
+            wait_for_cluster_ready_with_sasl(&cluster).await;
+
+            // Register cleanup so containers are removed on process exit.
+            unsafe {
+                unsafe extern "C" {
+                    fn atexit(f: extern "C" fn()) -> std::os::raw::c_int;
+                }
+                atexit(cleanup_on_exit);
+            }
+
+            cluster
+        })
+    })
+    .join()
+    .expect("Failed to initialize shared cluster")
+});
+
+/// Returns an `Arc` to the shared test cluster.
+pub fn get_shared_cluster() -> Arc<FlussTestingCluster> {
+    Arc::new(SHARED_CLUSTER.clone())
+}
+
+pub async fn create_table(
+    admin: &FlussAdmin,
+    table_path: &TablePath,
+    table_descriptor: &TableDescriptor,
+) {
+    admin
+        .create_table(table_path, table_descriptor, false)
+        .await
+        .expect("Failed to create table");
+}
+
+/// Similar to wait_for_cluster_ready but connects with SASL credentials.
+pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) {
+    let timeout = Duration::from_secs(30);
     let poll_interval = Duration::from_millis(500);
     let start = std::time::Instant::now();
 
+    let (username, password) = cluster
+        .sasl_users()
+        .first()
+        .expect("SASL cluster must have at least one user");
+
     loop {
-        let connection = cluster.get_fluss_connection().await;
+        let connection = cluster
+            .get_fluss_connection_with_sasl(username, password)
+            .await;
         if connection.get_admin().await.is_ok()
             && connection
                 .get_metadata()
@@ -44,7 +121,7 @@ pub async fn wait_for_cluster_ready(cluster: 
&FlussTestingCluster) {
 
         if start.elapsed() >= timeout {
             panic!(
-                "Server readiness check timed out after {} seconds. \
+                "SASL server readiness check timed out after {} seconds. \
                  CoordinatorEventProcessor may not be initialized or 
TabletServer may not be available.",
                 timeout.as_secs()
             );
@@ -54,56 +131,6 @@ pub async fn wait_for_cluster_ready(cluster: 
&FlussTestingCluster) {
     }
 }
 
-pub async fn create_table(
-    admin: &FlussAdmin,
-    table_path: &TablePath,
-    table_descriptor: &TableDescriptor,
-) {
-    admin
-        .create_table(&table_path, &table_descriptor, false)
-        .await
-        .expect("Failed to create table");
-}
-
-pub fn start_cluster(name: &str, cluster_lock: 
Arc<RwLock<Option<FlussTestingCluster>>>) {
-    let name = name.to_string();
-    std::thread::spawn(move || {
-        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-        rt.block_on(async {
-            let cluster = FlussTestingClusterBuilder::new(&name).build().await;
-            wait_for_cluster_ready(&cluster).await;
-            let mut guard = cluster_lock.write();
-            *guard = Some(cluster);
-        });
-    })
-    .join()
-    .expect("Failed to create cluster");
-}
-
-pub fn stop_cluster(cluster_lock: Arc<RwLock<Option<FlussTestingCluster>>>) {
-    std::thread::spawn(move || {
-        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-        rt.block_on(async {
-            let mut guard = cluster_lock.write();
-            if let Some(cluster) = guard.take() {
-                cluster.stop().await;
-            }
-        });
-    })
-    .join()
-    .expect("Failed to cleanup cluster");
-}
-
-pub fn get_cluster(cluster_lock: &RwLock<Option<FlussTestingCluster>>) -> 
Arc<FlussTestingCluster> {
-    let guard = cluster_lock.read();
-    Arc::new(
-        guard
-            .as_ref()
-            .expect("Fluss cluster not initialized. Make sure before_all() was 
called.")
-            .clone(),
-    )
-}
-
 /// Creates partitions for a partitioned table.
 ///
 /// # Arguments
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index a6cc27a..9675646 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -24,6 +24,7 @@ mod integration {
     mod fluss_cluster;
     mod kv_table;
     mod log_table;
+    mod sasl_auth;
 
     mod utils;
 
diff --git a/website/docs/user-guide/cpp/error-handling.md 
b/website/docs/user-guide/cpp/error-handling.md
index 76b03e3..3ded0c2 100644
--- a/website/docs/user-guide/cpp/error-handling.md
+++ b/website/docs/user-guide/cpp/error-handling.md
@@ -94,6 +94,7 @@ if (!result.Ok()) {
 | `ErrorCode::PARTITION_ALREADY_EXISTS`         | 42   | Partition already 
exists            |
 | `ErrorCode::PARTITION_SPEC_INVALID_EXCEPTION` | 43   | Invalid partition 
spec              |
 | `ErrorCode::LEADER_NOT_AVAILABLE_EXCEPTION`   | 44   | No leader available 
for partition   |
+| `ErrorCode::AUTHENTICATE_EXCEPTION`           | 46   | Authentication failed 
(bad credentials) |
 
 See `fluss::ErrorCode` in `fluss.hpp` for the full list of named constants.
 
@@ -147,6 +148,26 @@ if (!result.Ok()) {
 }
 ```
 
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist:
+
+```cpp
+fluss::Configuration config;
+config.bootstrap_servers = "127.0.0.1:9123";
+config.security_protocol = "sasl";
+config.security_sasl_username = "admin";
+config.security_sasl_password = "wrong-password";
+
+fluss::Connection conn;
+fluss::Result result = fluss::Connection::Create(config, conn);
+if (!result.Ok()) {
+    if (result.error_code == fluss::ErrorCode::AUTHENTICATE_EXCEPTION) {
+        std::cerr << "Authentication failed: " << result.error_message << 
std::endl;
+    }
+}
+```
+
 ### Schema Mismatch
 
 Using incorrect types or column indices when writing:
diff --git a/website/docs/user-guide/cpp/example/configuration.md 
b/website/docs/user-guide/cpp/example/configuration.md
index 2245ee1..f4b6309 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -25,15 +25,32 @@ All fields have sensible defaults. Only `bootstrap_servers` 
typically needs to b
 
 ```cpp
 fluss::Configuration config;
-config.bootstrap_servers = "127.0.0.1:9123";    // Coordinator address
-config.writer_request_max_size = 10 * 1024 * 1024;     // Max request size (10 
MB)
-config.writer_acks = "all";                      // Wait for all replicas
-config.writer_retries = std::numeric_limits<int32_t>::max();  // Retry on 
failure
-config.writer_batch_size = 2 * 1024 * 1024;     // Batch size (2 MB)
-config.writer_batch_timeout_ms = 100;           // Max time to wait for a 
batch to fill
-config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
-config.scanner_remote_log_prefetch_num = 4;      // Remote log prefetch count
-config.remote_file_download_thread_num = 3;  // Download threads
-config.scanner_remote_log_read_concurrency = 4;   // In-file remote log read 
concurrency
-config.scanner_log_max_poll_records = 500;        // Max records returned per 
poll()
+config.bootstrap_servers = "127.0.0.1:9123";                  // Coordinator 
address
+config.writer_request_max_size = 10 * 1024 * 1024;            // Max request 
size (10 MB)
+config.writer_acks = "all";                                    // Wait for all 
replicas
+config.writer_retries = std::numeric_limits<int32_t>::max();   // Retry on 
failure
+config.writer_batch_size = 2 * 1024 * 1024;                   // Batch size (2 
MB)
+config.writer_batch_timeout_ms = 100;                          // Max time to 
wait for a batch to fill
+config.writer_bucket_no_key_assigner = "sticky";               // "sticky" or 
"round_robin"
+config.scanner_remote_log_prefetch_num = 4;                    // Remote log 
prefetch count
+config.remote_file_download_thread_num = 3;                    // Download 
threads
+config.scanner_remote_log_read_concurrency = 4;                // In-file 
remote log read concurrency
+config.scanner_log_max_poll_records = 500;                     // Max records 
per poll
+config.connect_timeout_ms = 120000;                            // TCP connect 
timeout (ms)
+```
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```cpp
+fluss::Configuration config;
+config.bootstrap_servers = "127.0.0.1:9123";
+config.security_protocol = "sasl";
+config.security_sasl_mechanism = "PLAIN";
+config.security_sasl_username = "admin";
+config.security_sasl_password = "admin-secret";
+
+fluss::Connection conn;
+fluss::Result result = fluss::Connection::Create(config, conn);
 ```
diff --git a/website/docs/user-guide/python/error-handling.md 
b/website/docs/user-guide/python/error-handling.md
index 9fa4821..50a9e46 100644
--- a/website/docs/user-guide/python/error-handling.md
+++ b/website/docs/user-guide/python/error-handling.md
@@ -53,6 +53,7 @@ except fluss.FlussError as e:
 | `ErrorCode.PARTITION_ALREADY_EXISTS`         | 42   | Partition already 
exists            |
 | `ErrorCode.PARTITION_SPEC_INVALID_EXCEPTION` | 43   | Invalid partition spec 
             |
 | `ErrorCode.LEADER_NOT_AVAILABLE_EXCEPTION`   | 44   | No leader available 
for partition   |
+| `ErrorCode.AUTHENTICATE_EXCEPTION`           | 46   | Authentication failed 
(bad credentials) |
 
 See `fluss.ErrorCode` for the full list of named constants.
 
@@ -95,6 +96,24 @@ except fluss.FlussError as e:
         print("Partition does not exist, create it first")
 ```
 
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist.
+
+```python
+try:
+    config = fluss.Config({
+        "bootstrap.servers": "127.0.0.1:9123",
+        "client.security.protocol": "sasl",
+        "client.security.sasl.username": "admin",
+        "client.security.sasl.password": "wrong-password",
+    })
+    conn = await fluss.FlussConnection.create(config)
+except fluss.FlussError as e:
+    if e.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION:
+        print(f"Authentication failed: {e.message}")
+```
+
 ### Schema Mismatch
 
 Row data doesn't match the table schema.
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index 39c53be..90b1249 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -28,12 +28,34 @@ with await fluss.FlussConnection.create(config) as conn:
 | `writer.acks`                         | Acknowledgment setting (`all` waits 
for all replicas)                                 | `all`              |
 | `writer.retries`                      | Number of retries on failure         
                                                 | `2147483647`       |
 | `writer.batch-size`                   | Batch size for writes in bytes       
                                                 | `2097152` (2 MB)   |
-| `writer.batch-timeout-ms`             | The maximum time to wait for a 
writer batch to fill up before sending.               | `100`              |
-| `writer.bucket.no-key-assigner`       | Bucket assignment strategy for 
tables without bucket keys: `sticky` or `round_robin` | `sticky`           |
+| `writer.batch-timeout-ms`             | The maximum time to wait for a 
writer batch to fill up before sending.                | `100`              |
+| `writer.bucket.no-key-assigner`       | Bucket assignment strategy for 
tables without bucket keys: `sticky` or `round_robin`  | `sticky`           |
 | `scanner.remote-log.prefetch-num`     | Number of remote log segments to 
prefetch                                             | `4`                |
 | `remote-file.download-thread-num`     | Number of threads for remote log 
downloads                                            | `3`                |
 | `scanner.remote-log.read-concurrency` | Streaming read concurrency within a 
remote log file                                   | `4`                |
 | `scanner.log.max-poll-records`        | Max records returned in a single 
poll()                                               | `500`              |
+| `connect-timeout`                     | TCP connect timeout in milliseconds  
                                                 | `120000`           |
+| `security.protocol`                   | `PLAINTEXT` (default) or `sasl` for 
SASL auth                                        | `PLAINTEXT`        |
+| `security.sasl.mechanism`             | SASL mechanism (only `PLAIN` is 
supported)                                            | `PLAIN`            |
+| `security.sasl.username`              | SASL username (required when 
protocol is `sasl`)                                      | (empty)            |
+| `security.sasl.password`              | SASL password (required when 
protocol is `sasl`)                                      | (empty)            |
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```python
+config = fluss.Config({
+    "bootstrap.servers": "127.0.0.1:9123",
+    "security.protocol": "sasl",
+    "security.sasl.mechanism": "PLAIN",
+    "security.sasl.username": "admin",
+    "security.sasl.password": "admin-secret",
+})
+conn = await fluss.FlussConnection.create(config)
+```
+
+## Connection Lifecycle
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/rust/error-handling.md 
b/website/docs/user-guide/rust/error-handling.md
index 35ede6c..964f81f 100644
--- a/website/docs/user-guide/rust/error-handling.md
+++ b/website/docs/user-guide/rust/error-handling.md
@@ -71,6 +71,9 @@ match result {
     Err(ref e) if e.api_error() == 
Some(FlussError::LeaderNotAvailableException) => {
         eprintln!("Leader not available: {}", e);
     }
+    Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
+        eprintln!("Authentication failed: {}", e);
+    }
     _ => {}
 }
 ```
@@ -133,6 +136,22 @@ match result {
 }
 ```
 
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist.
+
+```rust
+use fluss::error::{Error, FlussError};
+
+let result = FlussConnection::new(config).await;
+match result {
+    Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
+        eprintln!("Authentication failed: {}", e);
+    }
+    _ => {}
+}
+```
+
 ### Schema Mismatch
 
 Row data does not match the expected table schema.
diff --git a/website/docs/user-guide/rust/example/configuration.md 
b/website/docs/user-guide/rust/example/configuration.md
index a2f52dc..f6340c9 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -17,16 +17,36 @@ let conn = FlussConnection::new(config).await?;
 
 ## Connection Configurations
 
-| Option                          | Description                                
                                          | Default          |
-|---------------------------------|--------------------------------------------------------------------------------------|------------------|
-| `bootstrap_servers`             | Coordinator server address                 
                                          | `127.0.0.1:9123` |
-| `writer_request_max_size`       | Maximum request size in bytes              
                                          | 10 MB            |
-| `writer_acks`                   | Acknowledgment setting (`all` waits for 
all replicas)                                | `all`            |
-| `writer_retries`                | Number of retries on failure               
                                          | `i32::MAX`       |
-| `writer_batch_size`             | Batch size for writes                      
                                          | 2 MB             |
-| `writer_batch_timeout_ms`       | The maximum time to wait for a writer 
batch to fill up before sending.               | `100`            |
-| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`         |
-| `scanner_remote_log_prefetch_num` | Number of remote log segments to 
prefetch                                           | `4`              |
-| `remote_file_download_thread_num` | Number of concurrent remote log file 
downloads                                      | `3`              |
-| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a 
remote log file                           | `4`              |
-| `scanner_log_max_poll_records`  | Maximum records returned in a single 
`poll()`                                       | `500`            |
+| Option                                | Description                          
                                                | Default          |
+|---------------------------------------|--------------------------------------------------------------------------------------|------------------|
+| `bootstrap_servers`                   | Coordinator server address           
                                                | `127.0.0.1:9123` |
+| `writer_request_max_size`             | Maximum request size in bytes        
                                                | 10 MB            |
+| `writer_acks`                         | Acknowledgment setting (`all` waits 
for all replicas)                                | `all`            |
+| `writer_retries`                      | Number of retries on failure         
                                                | `i32::MAX`       |
+| `writer_batch_size`                   | Batch size for writes                
                                                | 2 MB             |
+| `writer_batch_timeout_ms`             | The maximum time to wait for a 
writer batch to fill up before sending.               | `100`            |
+| `writer_bucket_no_key_assigner`       | Bucket assignment strategy for 
tables without bucket keys: `sticky` or `round_robin` | `sticky`         |
+| `scanner_remote_log_prefetch_num`     | Number of remote log segments to 
prefetch                                            | `4`              |
+| `remote_file_download_thread_num`     | Number of concurrent remote log file 
downloads                                       | `3`              |
+| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a 
remote log file                                  | `4`              |
+| `scanner_log_max_poll_records`        | Maximum records returned in a single 
`poll()`                                        | `500`            |
+| `connect_timeout_ms`                  | TCP connect timeout in milliseconds  
                                                | 120000           |
+| `security_protocol`                   | `PLAINTEXT` (default) or `sasl` for 
SASL auth                                       | `PLAINTEXT`      |
+| `security_sasl_mechanism`             | SASL mechanism (only `PLAIN` is 
supported)                                           | `PLAIN`          |
+| `security_sasl_username`              | SASL username (required when 
protocol is `sasl`)                                     | (empty)          |
+| `security_sasl_password`              | SASL password (required when 
protocol is `sasl`)                                     | (empty)          |
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```rust
+let mut config = Config::default();
+config.bootstrap_servers = "127.0.0.1:9123".to_string();
+config.security_protocol = "sasl".to_string();
+config.security_sasl_mechanism = "PLAIN".to_string();
+config.security_sasl_username = "admin".to_string();
+config.security_sasl_password = "admin-secret".to_string();
+
+let conn = FlussConnection::new(config).await?;
+```

Reply via email to