This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5f8130f feat: support sasl/plain auth (#375)
5f8130f is described below
commit 5f8130f26957a667eb6b9b33cf3b83841e87e5f2
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Feb 28 11:40:02 2026 +0000
feat: support sasl/plain auth (#375)
---
.github/workflows/build_and_test_rust.yml | 2 +-
bindings/cpp/include/fluss.hpp | 10 +
bindings/cpp/src/ffi_converter.hpp | 5 +
bindings/cpp/src/lib.rs | 13 +
bindings/cpp/test/test_sasl_auth.cpp | 125 +++++++++
bindings/cpp/test/test_utils.h | 151 +++++++----
bindings/python/src/config.rs | 77 ++++++
bindings/python/test/conftest.py | 57 +++-
bindings/python/test/test_sasl_auth.py | 108 ++++++++
crates/fluss/src/client/connection.rs | 18 +-
crates/fluss/src/config.rs | 166 +++++++++++-
crates/fluss/src/proto/fluss_api.proto | 11 +-
crates/fluss/src/rpc/api_key.rs | 4 +
crates/fluss/src/rpc/message/authenticate.rs | 86 ++++++
crates/fluss/src/rpc/message/mod.rs | 2 +
crates/fluss/src/rpc/server_connection.rs | 107 +++++++-
crates/fluss/tests/integration/admin.rs | 52 +---
crates/fluss/tests/integration/fluss_cluster.rs | 290 +++++++++++++++++----
crates/fluss/tests/integration/kv_table.rs | 43 +--
crates/fluss/tests/integration/log_table.rs | 107 ++++----
crates/fluss/tests/integration/sasl_auth.rs | 149 +++++++++++
.../fluss/tests/integration/table_remote_scan.rs | 95 +------
crates/fluss/tests/integration/utils.rs | 141 ++++++----
crates/fluss/tests/test_fluss.rs | 1 +
website/docs/user-guide/cpp/error-handling.md | 21 ++
.../docs/user-guide/cpp/example/configuration.md | 39 ++-
website/docs/user-guide/python/error-handling.md | 19 ++
.../user-guide/python/example/configuration.md | 26 +-
website/docs/user-guide/rust/error-handling.md | 19 ++
.../docs/user-guide/rust/example/configuration.md | 46 +++-
30 files changed, 1560 insertions(+), 430 deletions(-)
diff --git a/.github/workflows/build_and_test_rust.yml
b/.github/workflows/build_and_test_rust.yml
index c9e05b7..1bf7bc5 100644
--- a/.github/workflows/build_and_test_rust.yml
+++ b/.github/workflows/build_and_test_rust.yml
@@ -79,7 +79,7 @@ jobs:
- name: Integration Test (Linux only)
if: runner.os == 'Linux'
run: |
- RUST_TEST_THREADS=1 cargo test --features integration_tests
--all-targets --workspace --exclude fluss_python --exclude fluss-cpp --
--nocapture
+ cargo test --features integration_tests --all-targets --workspace
--exclude fluss_python --exclude fluss-cpp
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index cb06028..b507da7 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -998,6 +998,16 @@ struct Configuration {
// Maximum number of records returned in a single call to Poll() for
LogScanner
size_t scanner_log_max_poll_records{500};
int64_t writer_batch_timeout_ms{100};
+ // Connect timeout in milliseconds for TCP transport connect
+ uint64_t connect_timeout_ms{120000};
+ // Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
+ std::string security_protocol{"PLAINTEXT"};
+ // SASL mechanism (only "PLAIN" is supported)
+ std::string security_sasl_mechanism{"PLAIN"};
+ // SASL username (required when security_protocol is "sasl")
+ std::string security_sasl_username;
+ // SASL password (required when security_protocol is "sasl")
+ std::string security_sasl_password;
};
class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 9020027..3375761 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -57,6 +57,11 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.scanner_remote_log_read_concurrency =
config.scanner_remote_log_read_concurrency;
ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
+ ffi_config.connect_timeout_ms = config.connect_timeout_ms;
+ ffi_config.security_protocol = rust::String(config.security_protocol);
+ ffi_config.security_sasl_mechanism =
rust::String(config.security_sasl_mechanism);
+ ffi_config.security_sasl_username =
rust::String(config.security_sasl_username);
+ ffi_config.security_sasl_password =
rust::String(config.security_sasl_password);
return ffi_config;
}
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index d26af6a..c310fc8 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -49,6 +49,11 @@ mod ffi {
scanner_remote_log_read_concurrency: usize,
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
+ connect_timeout_ms: u64,
+ security_protocol: String,
+ security_sasl_mechanism: String,
+ security_sasl_username: String,
+ security_sasl_password: String,
}
struct FfiResult {
@@ -258,6 +263,9 @@ mod ffi {
type LookupResultInner;
// Connection
+ // TODO: all Result<*mut T> methods lose server error codes (mapped to
CLIENT_ERROR).
+ // Fix by introducing some struct like { result: FfiResult, ptr: i64
} to preserve error
+ // codes from the server, matching how Rust and Python bindings handle
errors.
fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;
unsafe fn delete_connection(conn: *mut Connection);
fn get_admin(self: &Connection) -> Result<*mut Admin>;
@@ -645,6 +653,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
remote_file_download_thread_num:
config.remote_file_download_thread_num,
scanner_remote_log_read_concurrency:
config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
+ connect_timeout_ms: config.connect_timeout_ms,
+ security_protocol: config.security_protocol.to_string(),
+ security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
+ security_sasl_username: config.security_sasl_username.to_string(),
+ security_sasl_password: config.security_sasl_password.to_string(),
};
let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config_core).await });
diff --git a/bindings/cpp/test/test_sasl_auth.cpp
b/bindings/cpp/test/test_sasl_auth.cpp
new file mode 100644
index 0000000..2208db3
--- /dev/null
+++ b/bindings/cpp/test/test_sasl_auth.cpp
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+
+class SaslAuthTest : public ::testing::Test {
+ protected:
+ const std::string& sasl_servers() {
+ return
fluss_test::FlussTestEnvironment::Instance()->GetSaslBootstrapServers();
+ }
+ const std::string& plaintext_servers() {
+ return
fluss_test::FlussTestEnvironment::Instance()->GetBootstrapServers();
+ }
+};
+
+TEST_F(SaslAuthTest, SaslConnectWithValidCredentials) {
+ fluss::Configuration config;
+ config.bootstrap_servers = sasl_servers();
+ config.security_protocol = "sasl";
+ config.security_sasl_mechanism = "PLAIN";
+ config.security_sasl_username = "admin";
+ config.security_sasl_password = "admin-secret";
+
+ fluss::Connection conn;
+ ASSERT_OK(fluss::Connection::Create(config, conn));
+
+ fluss::Admin admin;
+ ASSERT_OK(conn.GetAdmin(admin));
+
+ // Perform a basic operation to confirm the connection is fully functional
+ std::string db_name = "cpp_sasl_test_valid_db";
+ fluss::DatabaseDescriptor descriptor;
+ descriptor.comment = "created via SASL auth";
+ ASSERT_OK(admin.CreateDatabase(db_name, descriptor, true));
+
+ bool exists = false;
+ ASSERT_OK(admin.DatabaseExists(db_name, exists));
+ ASSERT_TRUE(exists);
+
+ ASSERT_OK(admin.DropDatabase(db_name, true, true));
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithSecondUser) {
+ fluss::Configuration config;
+ config.bootstrap_servers = sasl_servers();
+ config.security_protocol = "sasl";
+ config.security_sasl_mechanism = "PLAIN";
+ config.security_sasl_username = "alice";
+ config.security_sasl_password = "alice-secret";
+
+ fluss::Connection conn;
+ ASSERT_OK(fluss::Connection::Create(config, conn));
+
+ fluss::Admin admin;
+ ASSERT_OK(conn.GetAdmin(admin));
+
+ // Basic operation to confirm functional connection
+ bool exists = false;
+ ASSERT_OK(admin.DatabaseExists("some_nonexistent_db_alice", exists));
+ ASSERT_FALSE(exists);
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithWrongPassword) {
+ fluss::Configuration config;
+ config.bootstrap_servers = sasl_servers();
+ config.security_protocol = "sasl";
+ config.security_sasl_mechanism = "PLAIN";
+ config.security_sasl_username = "admin";
+ config.security_sasl_password = "wrong-password";
+
+ fluss::Connection conn;
+ auto result = fluss::Connection::Create(config, conn);
+ ASSERT_FALSE(result.Ok());
+ // TODO: error_code is CLIENT_ERROR (-2) because CXX Result<*mut T> loses
the server
+ // error code. Should be AUTHENTICATE_EXCEPTION (46) once fixed
+ EXPECT_NE(result.error_message.find("Authentication failed"),
std::string::npos)
+ << "Expected 'Authentication failed' in: " << result.error_message;
+}
+
+TEST_F(SaslAuthTest, SaslConnectWithUnknownUser) {
+ fluss::Configuration config;
+ config.bootstrap_servers = sasl_servers();
+ config.security_protocol = "sasl";
+ config.security_sasl_mechanism = "PLAIN";
+ config.security_sasl_username = "nonexistent_user";
+ config.security_sasl_password = "some-password";
+
+ fluss::Connection conn;
+ auto result = fluss::Connection::Create(config, conn);
+ ASSERT_FALSE(result.Ok());
+ // TODO: same as above — should check error_code == AUTHENTICATE_EXCEPTION
once fixed.
+ EXPECT_NE(result.error_message.find("Authentication failed"),
std::string::npos)
+ << "Expected 'Authentication failed' in: " << result.error_message;
+}
+
+TEST_F(SaslAuthTest, SaslClientToPlaintextServer) {
+ fluss::Configuration config;
+ config.bootstrap_servers = plaintext_servers();
+ config.security_protocol = "sasl";
+ config.security_sasl_mechanism = "PLAIN";
+ config.security_sasl_username = "admin";
+ config.security_sasl_password = "admin-secret";
+
+ fluss::Connection conn;
+ auto result = fluss::Connection::Create(config, conn);
+ ASSERT_FALSE(result.Ok()) << "SASL client connecting to plaintext server
should fail";
+}
diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h
index bae5237..98d119a 100644
--- a/bindings/cpp/test/test_utils.h
+++ b/bindings/cpp/test/test_utils.h
@@ -49,23 +49,46 @@
namespace fluss_test {
-static constexpr const char* kFlussVersion = "0.7.0";
+static constexpr const char* kFlussImage = "apache/fluss";
+static constexpr const char* kFlussVersion = "0.8.0-incubating";
static constexpr const char* kNetworkName = "fluss-cpp-test-network";
static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
static constexpr int kCoordinatorPort = 9123;
static constexpr int kTabletServerPort = 9124;
+static constexpr int kPlainClientPort = 9223;
+static constexpr int kPlainClientTabletPort = 9224;
/// Execute a shell command and return its exit code.
-inline int RunCommand(const std::string& cmd) {
- return system(cmd.c_str());
+inline int RunCommand(const std::string& cmd) { return system(cmd.c_str()); }
+
+/// Join property lines with the escaped newline separator used by `printf` in
docker commands.
+inline std::string JoinProps(const std::vector<std::string>& lines) {
+ std::string result;
+ for (size_t i = 0; i < lines.size(); ++i) {
+ if (i > 0) result += "\\n";
+ result += lines[i];
+ }
+ return result;
+}
+
+/// Build a `docker run` command with FLUSS_PROPERTIES.
+inline std::string DockerRunCmd(const std::string& name, const std::string&
props,
+ const std::vector<std::string>& port_mappings,
+ const std::string& server_type) {
+ std::string cmd = "docker run -d --rm --name " + name + " --network " +
kNetworkName;
+ for (const auto& pm : port_mappings) {
+ cmd += " -p " + pm;
+ }
+ cmd += " -e FLUSS_PROPERTIES=\"$(printf '" + props + "')\"";
+ cmd += " " + std::string(kFlussImage) + ":" + kFlussVersion + " " +
server_type;
+ return cmd;
}
/// Wait until a TCP port is accepting connections, or timeout.
inline bool WaitForPort(const std::string& host, int port, int timeout_seconds
= 60) {
- auto deadline =
- std::chrono::steady_clock::now() +
std::chrono::seconds(timeout_seconds);
+ auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(timeout_seconds);
while (std::chrono::steady_clock::now() < deadline) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
@@ -114,10 +137,8 @@ class FlussTestCluster {
RunCommand(std::string("docker network create ") + kNetworkName + "
2>/dev/null || true");
// Start ZooKeeper
- std::string zk_cmd = std::string("docker run -d --rm") +
- " --name " + kZookeeperName +
- " --network " + kNetworkName +
- " zookeeper:3.9.2";
+ std::string zk_cmd = std::string("docker run -d --rm") + " --name " +
kZookeeperName +
+ " --network " + kNetworkName + " zookeeper:3.9.2";
if (RunCommand(zk_cmd) != 0) {
std::cerr << "Failed to start ZooKeeper" << std::endl;
return false;
@@ -126,23 +147,29 @@ class FlussTestCluster {
// Wait for ZooKeeper to be ready before starting Fluss servers
std::this_thread::sleep_for(std::chrono::seconds(5));
- // Start Coordinator Server
- std::string coord_props =
- "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
- "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) +
":0, CLIENT://" +
- std::string(kCoordinatorName) + ":9123\\n"
- "advertised.listeners: CLIENT://localhost:9123\\n"
- "internal.listener.name: INTERNAL\\n"
- "netty.server.num-network-threads: 1\\n"
- "netty.server.num-worker-threads: 3";
-
- std::string coord_cmd = std::string("docker run -d --rm") +
- " --name " + kCoordinatorName +
- " --network " + kNetworkName +
- " -p 9123:9123" +
- " -e FLUSS_PROPERTIES=\"$(printf '" +
coord_props + "')\"" +
- " fluss/fluss:" + kFlussVersion +
- " coordinatorServer";
+ // Start Coordinator Server (dual listeners: CLIENT=SASL on 9123,
PLAIN_CLIENT=plaintext on
+ // 9223)
+ std::string sasl_jaas =
+ "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule
required"
+ " user_admin=\"admin-secret\" user_alice=\"alice-secret\";";
+
+ std::string coord = std::string(kCoordinatorName);
+ std::string zk = std::string(kZookeeperName);
+ std::string coord_props = JoinProps({
+ "zookeeper.address: " + zk + ":2181",
+ "bind.listeners: INTERNAL://" + coord + ":0, CLIENT://" + coord +
+ ":9123, PLAIN_CLIENT://" + coord + ":9223",
+ "advertised.listeners: CLIENT://localhost:9123,
PLAIN_CLIENT://localhost:9223",
+ "internal.listener.name: INTERNAL",
+ "security.protocol.map: CLIENT:sasl",
+ "security.sasl.enabled.mechanisms: plain",
+ "security.sasl.plain.jaas.config: " + sasl_jaas,
+ "netty.server.num-network-threads: 1",
+ "netty.server.num-worker-threads: 3",
+ });
+
+ std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props,
+ {"9123:9123", "9223:9223"},
"coordinatorServer");
if (RunCommand(coord_cmd) != 0) {
std::cerr << "Failed to start Coordinator Server" << std::endl;
Stop();
@@ -156,24 +183,27 @@ class FlussTestCluster {
return false;
}
- // Start Tablet Server
- std::string ts_props =
- "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n"
- "bind.listeners: INTERNAL://" + std::string(kTabletServerName) +
":0, CLIENT://" +
- std::string(kTabletServerName) + ":9123\\n"
- "advertised.listeners: CLIENT://localhost:" +
std::to_string(kTabletServerPort) + "\\n"
- "internal.listener.name: INTERNAL\\n"
- "tablet-server.id: 0\\n"
- "netty.server.num-network-threads: 1\\n"
- "netty.server.num-worker-threads: 3";
-
- std::string ts_cmd = std::string("docker run -d --rm") +
- " --name " + kTabletServerName +
- " --network " + kNetworkName +
- " -p " + std::to_string(kTabletServerPort) +
":9123" +
- " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props +
"')\"" +
- " fluss/fluss:" + kFlussVersion +
- " tabletServer";
+ // Start Tablet Server (dual listeners: CLIENT=SASL on 9123,
PLAIN_CLIENT=plaintext on 9223)
+ std::string ts = std::string(kTabletServerName);
+ std::string ts_props = JoinProps({
+ "zookeeper.address: " + zk + ":2181",
+ "bind.listeners: INTERNAL://" + ts + ":0, CLIENT://" + ts +
":9123, PLAIN_CLIENT://" +
+ ts + ":9223",
+ "advertised.listeners: CLIENT://localhost:" +
std::to_string(kTabletServerPort) +
+ ", PLAIN_CLIENT://localhost:" +
std::to_string(kPlainClientTabletPort),
+ "internal.listener.name: INTERNAL",
+ "security.protocol.map: CLIENT:sasl",
+ "security.sasl.enabled.mechanisms: plain",
+ "security.sasl.plain.jaas.config: " + sasl_jaas,
+ "tablet-server.id: 0",
+ "netty.server.num-network-threads: 1",
+ "netty.server.num-worker-threads: 3",
+ });
+
+ std::string ts_cmd = DockerRunCmd(kTabletServerName, ts_props,
+ {std::to_string(kTabletServerPort) +
":9123",
+
std::to_string(kPlainClientTabletPort) + ":9223"},
+ "tabletServer");
if (RunCommand(ts_cmd) != 0) {
std::cerr << "Failed to start Tablet Server" << std::endl;
Stop();
@@ -187,7 +217,20 @@ class FlussTestCluster {
return false;
}
- bootstrap_servers_ = "127.0.0.1:9123";
+ // Wait for plaintext listeners
+ if (!WaitForPort("127.0.0.1", kPlainClientPort)) {
+ std::cerr << "Coordinator plaintext listener did not become ready"
<< std::endl;
+ Stop();
+ return false;
+ }
+ if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
+ std::cerr << "Tablet Server plaintext listener did not become
ready" << std::endl;
+ Stop();
+ return false;
+ }
+
+ bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
+ sasl_bootstrap_servers_ = "127.0.0.1:" +
std::to_string(kCoordinatorPort);
std::cout << "Fluss cluster started successfully." << std::endl;
return true;
}
@@ -204,9 +247,11 @@ class FlussTestCluster {
}
const std::string& GetBootstrapServers() const { return
bootstrap_servers_; }
+ const std::string& GetSaslBootstrapServers() const { return
sasl_bootstrap_servers_; }
private:
std::string bootstrap_servers_;
+ std::string sasl_bootstrap_servers_;
bool external_cluster_{false};
};
@@ -230,8 +275,7 @@ class FlussTestEnvironment : public ::testing::Environment {
fluss::Configuration config;
config.bootstrap_servers = cluster_.GetBootstrapServers();
- auto deadline =
- std::chrono::steady_clock::now() + std::chrono::seconds(60);
+ auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(60);
while (std::chrono::steady_clock::now() < deadline) {
auto result = fluss::Connection::Create(config, connection_);
if (result.Ok()) {
@@ -247,13 +291,12 @@ class FlussTestEnvironment : public
::testing::Environment {
GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
}
- void TearDown() override {
- cluster_.Stop();
- }
+ void TearDown() override { cluster_.Stop(); }
fluss::Connection& GetConnection() { return connection_; }
fluss::Admin& GetAdmin() { return admin_; }
const std::string& GetBootstrapServers() { return
cluster_.GetBootstrapServers(); }
+ const std::string& GetSaslBootstrapServers() { return
cluster_.GetSaslBootstrapServers(); }
private:
FlussTestEnvironment() = default;
@@ -286,8 +329,8 @@ inline void CreatePartitions(fluss::Admin& admin, const
fluss::TablePath& path,
/// Poll a LogScanner for ScanRecords until `expected_count` items are
collected or timeout.
/// `extract_fn` is called for each ScanRecord and should return a value of
type T.
template <typename T, typename ExtractFn>
-void PollRecords(fluss::LogScanner& scanner, size_t expected_count,
- ExtractFn extract_fn, std::vector<T>& out) {
+void PollRecords(fluss::LogScanner& scanner, size_t expected_count, ExtractFn
extract_fn,
+ std::vector<T>& out) {
auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(10);
while (out.size() < expected_count && std::chrono::steady_clock::now() <
deadline) {
fluss::ScanRecords records;
@@ -301,8 +344,8 @@ void PollRecords(fluss::LogScanner& scanner, size_t
expected_count,
/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are
collected or timeout.
/// `extract_fn` is called with the full ArrowRecordBatches and should return
a std::vector<T>.
template <typename T, typename ExtractFn>
-void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count,
- ExtractFn extract_fn, std::vector<T>& out) {
+void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count,
ExtractFn extract_fn,
+ std::vector<T>& out) {
auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(10);
while (out.size() < expected_count && std::chrono::steady_clock::now() <
deadline) {
fluss::ArrowRecordBatches batches;
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 9c0059e..4582d43 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -108,6 +108,23 @@ impl Config {
}
};
}
+ "connect-timeout" => {
+ config.connect_timeout_ms =
value.parse::<u64>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
+ }
+ "security.protocol" => {
+ config.security_protocol = value;
+ }
+ "security.sasl.mechanism" => {
+ config.security_sasl_mechanism = value;
+ }
+ "security.sasl.username" => {
+ config.security_sasl_username = value;
+ }
+ "security.sasl.password" => {
+ config.security_sasl_password = value;
+ }
_ => {
return Err(FlussError::new_err(format!("Unknown
property: {key}")));
}
@@ -237,6 +254,66 @@ impl Config {
fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
self.inner.writer_batch_timeout_ms = timeout;
}
+
+ /// Get the connect timeout in milliseconds
+ #[getter]
+ fn connect_timeout_ms(&self) -> u64 {
+ self.inner.connect_timeout_ms
+ }
+
+ /// Set the connect timeout in milliseconds
+ #[setter]
+ fn set_connect_timeout_ms(&mut self, timeout: u64) {
+ self.inner.connect_timeout_ms = timeout;
+ }
+
+ /// Get the security protocol
+ #[getter]
+ fn security_protocol(&self) -> String {
+ self.inner.security_protocol.clone()
+ }
+
+ /// Set the security protocol
+ #[setter]
+ fn set_security_protocol(&mut self, protocol: String) {
+ self.inner.security_protocol = protocol;
+ }
+
+ /// Get the SASL mechanism
+ #[getter]
+ fn security_sasl_mechanism(&self) -> String {
+ self.inner.security_sasl_mechanism.clone()
+ }
+
+ /// Set the SASL mechanism
+ #[setter]
+ fn set_security_sasl_mechanism(&mut self, mechanism: String) {
+ self.inner.security_sasl_mechanism = mechanism;
+ }
+
+ /// Get the SASL username
+ #[getter]
+ fn security_sasl_username(&self) -> String {
+ self.inner.security_sasl_username.clone()
+ }
+
+ /// Set the SASL username
+ #[setter]
+ fn set_security_sasl_username(&mut self, username: String) {
+ self.inner.security_sasl_username = username;
+ }
+
+ /// Get the SASL password
+ #[getter]
+ fn security_sasl_password(&self) -> String {
+ self.inner.security_sasl_password.clone()
+ }
+
+ /// Set the SASL password
+ #[setter]
+ fn set_security_sasl_password(&mut self, password: String) {
+ self.inner.security_sasl_password = password;
+ }
}
impl Config {
diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py
index fbd7396..0a969e8 100644
--- a/bindings/python/test/conftest.py
+++ b/bindings/python/test/conftest.py
@@ -33,7 +33,8 @@ import pytest_asyncio
import fluss
-FLUSS_VERSION = "0.7.0"
+FLUSS_IMAGE = "apache/fluss"
+FLUSS_VERSION = "0.8.0-incubating"
BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
@@ -53,7 +54,7 @@ def _wait_for_port(host, port, timeout=60):
def fluss_cluster():
"""Start a Fluss cluster using testcontainers, or use an existing one."""
if BOOTSTRAP_SERVERS_ENV:
- yield BOOTSTRAP_SERVERS_ENV
+ yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
return
from testcontainers.core.container import DockerContainer
@@ -68,20 +69,30 @@ def fluss_cluster():
.with_name("zookeeper-python-test")
)
+ sasl_jaas = (
+ "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
+ ' user_admin="admin-secret" user_alice="alice-secret";'
+ )
coordinator_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://coordinator-server-python-test:0,"
- " CLIENT://coordinator-server-python-test:9123",
- "advertised.listeners: CLIENT://localhost:9123",
+ " CLIENT://coordinator-server-python-test:9123,"
+ " PLAIN_CLIENT://coordinator-server-python-test:9223",
+ "advertised.listeners: CLIENT://localhost:9123,"
+ " PLAIN_CLIENT://localhost:9223",
"internal.listener.name: INTERNAL",
+ "security.protocol.map: CLIENT:sasl",
+ "security.sasl.enabled.mechanisms: plain",
+ f"security.sasl.plain.jaas.config: {sasl_jaas}",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
coordinator = (
- DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+ DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("coordinator-server-python-test")
.with_bind_ports(9123, 9123)
+ .with_bind_ports(9223, 9223)
.with_command("coordinatorServer")
.with_env("FLUSS_PROPERTIES", coordinator_props)
)
@@ -89,18 +100,24 @@ def fluss_cluster():
tablet_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://tablet-server-python-test:0,"
- " CLIENT://tablet-server-python-test:9123",
- "advertised.listeners: CLIENT://localhost:9124",
+ " CLIENT://tablet-server-python-test:9123,"
+ " PLAIN_CLIENT://tablet-server-python-test:9223",
+ "advertised.listeners: CLIENT://localhost:9124,"
+ " PLAIN_CLIENT://localhost:9224",
"internal.listener.name: INTERNAL",
+ "security.protocol.map: CLIENT:sasl",
+ "security.sasl.enabled.mechanisms: plain",
+ f"security.sasl.plain.jaas.config: {sasl_jaas}",
"tablet-server.id: 0",
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
tablet_server = (
- DockerContainer(f"fluss/fluss:{FLUSS_VERSION}")
+ DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("tablet-server-python-test")
.with_bind_ports(9123, 9124)
+ .with_bind_ports(9223, 9224)
.with_command("tabletServer")
.with_env("FLUSS_PROPERTIES", tablet_props)
)
@@ -111,10 +128,13 @@ def fluss_cluster():
_wait_for_port("localhost", 9123)
_wait_for_port("localhost", 9124)
+ _wait_for_port("localhost", 9223)
+ _wait_for_port("localhost", 9224)
# Extra wait for cluster to fully initialize
time.sleep(10)
- yield "127.0.0.1:9123"
+ # (plaintext_bootstrap, sasl_bootstrap)
+ yield ("127.0.0.1:9223", "127.0.0.1:9123")
tablet_server.stop()
coordinator.stop()
@@ -124,13 +144,28 @@ def fluss_cluster():
@pytest_asyncio.fixture(scope="session")
async def connection(fluss_cluster):
- """Session-scoped connection to the Fluss cluster."""
- config = fluss.Config({"bootstrap.servers": fluss_cluster})
+ """Session-scoped connection to the Fluss cluster (plaintext)."""
+ plaintext_addr, _sasl_addr = fluss_cluster
+ config = fluss.Config({"bootstrap.servers": plaintext_addr})
conn = await fluss.FlussConnection.create(config)
yield conn
conn.close()
[email protected](scope="session")
+def sasl_bootstrap_servers(fluss_cluster):
+ """Bootstrap servers for the SASL listener."""
+ _plaintext_addr, sasl_addr = fluss_cluster
+ return sasl_addr
+
+
[email protected](scope="session")
+def plaintext_bootstrap_servers(fluss_cluster):
+ """Bootstrap servers for the plaintext (non-SASL) listener."""
+ plaintext_addr, _sasl_addr = fluss_cluster
+ return plaintext_addr
+
+
@pytest_asyncio.fixture(scope="session")
async def admin(connection):
"""Session-scoped admin client."""
diff --git a/bindings/python/test/test_sasl_auth.py
b/bindings/python/test/test_sasl_auth.py
new file mode 100644
index 0000000..30fce4c
--- /dev/null
+++ b/bindings/python/test/test_sasl_auth.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for SASL/PLAIN authentication.
+
+Mirrors the Rust integration tests in
crates/fluss/tests/integration/sasl_auth.rs.
+"""
+
+import pytest
+
+import fluss
+
+
+async def test_sasl_connect_with_valid_credentials(sasl_bootstrap_servers):
+ """Verify that a client with correct SASL credentials can connect and
perform operations."""
+ config = fluss.Config({
+ "bootstrap.servers": sasl_bootstrap_servers,
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "admin",
+ "security.sasl.password": "admin-secret",
+ })
+ conn = await fluss.FlussConnection.create(config)
+ admin = await conn.get_admin()
+
+ db_name = "py_sasl_test_valid_db"
+ db_descriptor = fluss.DatabaseDescriptor(comment="created via SASL auth")
+ await admin.create_database(db_name, db_descriptor, ignore_if_exists=True)
+
+ assert await admin.database_exists(db_name)
+
+ # Cleanup
+ await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True)
+ conn.close()
+
+
+async def test_sasl_connect_with_second_user(sasl_bootstrap_servers):
+ """Verify that a second user can also authenticate successfully."""
+ config = fluss.Config({
+ "bootstrap.servers": sasl_bootstrap_servers,
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "alice",
+ "security.sasl.password": "alice-secret",
+ })
+ conn = await fluss.FlussConnection.create(config)
+ admin = await conn.get_admin()
+
+ # Basic operation to confirm functional connection
+ assert not await admin.database_exists("some_nonexistent_db_alice")
+ conn.close()
+
+
+async def test_sasl_connect_with_wrong_password(sasl_bootstrap_servers):
+ """Verify that wrong credentials are rejected with
AUTHENTICATE_EXCEPTION."""
+ config = fluss.Config({
+ "bootstrap.servers": sasl_bootstrap_servers,
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "admin",
+ "security.sasl.password": "wrong-password",
+ })
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await fluss.FlussConnection.create(config)
+
+ assert exc_info.value.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION
+
+
+async def test_sasl_connect_with_unknown_user(sasl_bootstrap_servers):
+ """Verify that a nonexistent user is rejected with
AUTHENTICATE_EXCEPTION."""
+ config = fluss.Config({
+ "bootstrap.servers": sasl_bootstrap_servers,
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "nonexistent_user",
+ "security.sasl.password": "some-password",
+ })
+ with pytest.raises(fluss.FlussError) as exc_info:
+ await fluss.FlussConnection.create(config)
+
+ assert exc_info.value.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION
+
+
+async def test_sasl_client_to_plaintext_server(plaintext_bootstrap_servers):
+ """Verify that a SASL-configured client fails when connecting to a
plaintext server."""
+ config = fluss.Config({
+ "bootstrap.servers": plaintext_bootstrap_servers,
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "admin",
+ "security.sasl.password": "admin-secret",
+ })
+ with pytest.raises(fluss.FlussError):
+ await fluss.FlussConnection.create(config)
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index a17e57f..703b588 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -23,6 +23,7 @@ use crate::config::Config;
use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
+use std::time::Duration;
use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;
@@ -36,7 +37,22 @@ pub struct FlussConnection {
impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
- let connections = Arc::new(RpcClient::new());
+ arg.validate_security()
+ .map_err(|msg| Error::IllegalArgument { message: msg })?;
+
+ let timeout = Duration::from_millis(arg.connect_timeout_ms);
+ let connections = if arg.is_sasl_enabled() {
+ Arc::new(
+ RpcClient::new()
+ .with_sasl(
+ arg.security_sasl_username.clone(),
+ arg.security_sasl_password.clone(),
+ )
+ .with_timeout(timeout),
+ )
+ } else {
+ Arc::new(RpcClient::new().with_timeout(timeout))
+ };
let metadata = Metadata::new(arg.bootstrap_servers.as_str(),
connections.clone()).await?;
Ok(FlussConnection {
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index a0d7e70..438c948 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -30,6 +30,9 @@ const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
const DEFAULT_ACKS: &str = "all";
+const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
+const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
+const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
/// Bucket assigner strategy for tables without bucket keys.
/// Matches Java `client.writer.bucket.no-key-assigner`.
@@ -51,7 +54,7 @@ impl fmt::Display for NoKeyAssigner {
}
}
-#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
+#[derive(Parser, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {
#[arg(long, default_value_t = String::from(DEFAULT_BOOTSTRAP_SERVER))]
@@ -96,6 +99,58 @@ pub struct Config {
/// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
pub writer_batch_timeout_ms: i64,
+
+ /// Connect timeout in milliseconds for TCP transport connect.
+ /// Default: 120000 (120 seconds).
+ #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
+ pub connect_timeout_ms: u64,
+
+ #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
+ pub security_protocol: String,
+
+ #[arg(long, default_value_t = String::from(DEFAULT_SASL_MECHANISM))]
+ pub security_sasl_mechanism: String,
+
+ #[arg(long, default_value_t = String::new())]
+ pub security_sasl_username: String,
+
+ #[arg(long, default_value_t = String::new())]
+ #[serde(skip_serializing)]
+ pub security_sasl_password: String,
+}
+
+impl std::fmt::Debug for Config {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Config")
+ .field("bootstrap_servers", &self.bootstrap_servers)
+ .field("writer_request_max_size", &self.writer_request_max_size)
+ .field("writer_acks", &self.writer_acks)
+ .field("writer_retries", &self.writer_retries)
+ .field("writer_batch_size", &self.writer_batch_size)
+ .field(
+ "writer_bucket_no_key_assigner",
+ &self.writer_bucket_no_key_assigner,
+ )
+ .field(
+ "scanner_remote_log_prefetch_num",
+ &self.scanner_remote_log_prefetch_num,
+ )
+ .field(
+ "remote_file_download_thread_num",
+ &self.remote_file_download_thread_num,
+ )
+ .field(
+ "scanner_log_max_poll_records",
+ &self.scanner_log_max_poll_records,
+ )
+ .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
+ .field("connect_timeout_ms", &self.connect_timeout_ms)
+ .field("security_protocol", &self.security_protocol)
+ .field("security_sasl_mechanism", &self.security_sasl_mechanism)
+ .field("security_sasl_username", &self.security_sasl_username)
+ .field("security_sasl_password", &"[REDACTED]")
+ .finish()
+ }
}
impl Default for Config {
@@ -112,6 +167,115 @@ impl Default for Config {
scanner_remote_log_read_concurrency:
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
+ connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
+ security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
+ security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
+ security_sasl_username: String::new(),
+ security_sasl_password: String::new(),
+ }
+ }
+}
+
+impl Config {
+ /// Returns true when the security protocol indicates SASL authentication
+ /// should be performed. Matches Java's `SaslAuthenticationPlugin` which
+ /// registers as `"sasl"` (case-insensitive).
+ pub fn is_sasl_enabled(&self) -> bool {
+ self.security_protocol.eq_ignore_ascii_case("sasl")
+ }
+
+ /// Validates security configuration. Returns `Ok(())` when the config is
+ /// consistent, or an error message when SASL is enabled but the config is
+ /// incomplete or uses an unsupported mechanism.
+ pub fn validate_security(&self) -> Result<(), String> {
+ if !self.is_sasl_enabled() {
+ return Ok(());
+ }
+ if !self.security_sasl_mechanism.eq_ignore_ascii_case("PLAIN") {
+ return Err(format!(
+ "Unsupported SASL mechanism: '{}'. Only 'PLAIN' is supported.",
+ self.security_sasl_mechanism
+ ));
+ }
+ if self.security_sasl_username.is_empty() {
+ return Err(
+ "security_sasl_username must be set when security_protocol is
'sasl'".to_string(),
+ );
}
+ if self.security_sasl_password.is_empty() {
+ return Err(
+ "security_sasl_password must be set when security_protocol is
'sasl'".to_string(),
+ );
+ }
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_default_is_not_sasl() {
+ let config = Config::default();
+ assert!(!config.is_sasl_enabled());
+ assert!(config.validate_security().is_ok());
+ }
+
+ #[test]
+ fn test_sasl_enabled_valid() {
+ let config = Config {
+ security_protocol: "sasl".to_string(),
+ security_sasl_mechanism: "PLAIN".to_string(),
+ security_sasl_username: "admin".to_string(),
+ security_sasl_password: "secret".to_string(),
+ ..Config::default()
+ };
+ assert!(config.is_sasl_enabled());
+ assert!(config.validate_security().is_ok());
+ }
+
+ #[test]
+ fn test_sasl_enabled_case_insensitive() {
+ let config = Config {
+ security_protocol: "SASL".to_string(),
+ security_sasl_username: "admin".to_string(),
+ security_sasl_password: "secret".to_string(),
+ ..Config::default()
+ };
+ assert!(config.is_sasl_enabled());
+ assert!(config.validate_security().is_ok());
+ }
+
+ #[test]
+ fn test_sasl_missing_username() {
+ let config = Config {
+ security_protocol: "sasl".to_string(),
+ security_sasl_password: "secret".to_string(),
+ ..Config::default()
+ };
+ assert!(config.validate_security().is_err());
+ }
+
+ #[test]
+ fn test_sasl_missing_password() {
+ let config = Config {
+ security_protocol: "sasl".to_string(),
+ security_sasl_username: "admin".to_string(),
+ ..Config::default()
+ };
+ assert!(config.validate_security().is_err());
+ }
+
+ #[test]
+ fn test_sasl_unsupported_mechanism() {
+ let config = Config {
+ security_protocol: "sasl".to_string(),
+ security_sasl_mechanism: "SCRAM-SHA-256".to_string(),
+ security_sasl_username: "admin".to_string(),
+ security_sasl_password: "secret".to_string(),
+ ..Config::default()
+ };
+ assert!(config.validate_security().is_err());
}
}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index eca4cf3..1c7ee7e 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -408,4 +408,13 @@ message DropPartitionRequest {
required bool ignore_if_not_exists = 3;
}
-message DropPartitionResponse {}
\ No newline at end of file
+message DropPartitionResponse {}
+
+message AuthenticateRequest {
+ required string protocol = 1;
+ required bytes token = 2;
+}
+
+message AuthenticateResponse {
+ optional bytes challenge = 1;
+}
\ No newline at end of file
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index f6009c0..4231fb0 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -40,6 +40,7 @@ pub enum ApiKey {
GetLatestLakeSnapshot,
CreatePartition,
DropPartition,
+ Authenticate,
Unknown(i16),
}
@@ -67,6 +68,7 @@ impl From<i16> for ApiKey {
1035 => ApiKey::GetDatabaseInfo,
1036 => ApiKey::CreatePartition,
1037 => ApiKey::DropPartition,
+ 1038 => ApiKey::Authenticate,
_ => Unknown(key),
}
}
@@ -96,6 +98,7 @@ impl From<ApiKey> for i16 {
ApiKey::GetDatabaseInfo => 1035,
ApiKey::CreatePartition => 1036,
ApiKey::DropPartition => 1037,
+ ApiKey::Authenticate => 1038,
Unknown(x) => x,
}
}
@@ -129,6 +132,7 @@ mod tests {
(1035, ApiKey::GetDatabaseInfo),
(1036, ApiKey::CreatePartition),
(1037, ApiKey::DropPartition),
+ (1038, ApiKey::Authenticate),
];
for (raw, key) in cases {
diff --git a/crates/fluss/src/rpc/message/authenticate.rs
b/crates/fluss/src/rpc/message/authenticate.rs
new file mode 100644
index 0000000..1292cdc
--- /dev/null
+++ b/crates/fluss/src/rpc/message/authenticate.rs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::{AuthenticateRequest as ProtoAuthenticateRequest,
AuthenticateResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug, Clone)]
+pub struct AuthenticateRequest {
+ pub inner_request: ProtoAuthenticateRequest,
+}
+
+impl AuthenticateRequest {
+ /// Build a SASL/PLAIN authenticate request.
+ /// Token format: `\0<username>\0<password>` (NUL-separated UTF-8).
+ pub fn new_plain(username: &str, password: &str) -> Self {
+ let mut token = Vec::with_capacity(1 + username.len() + 1 +
password.len());
+ token.push(0u8);
+ token.extend_from_slice(username.as_bytes());
+ token.push(0u8);
+ token.extend_from_slice(password.as_bytes());
+
+ Self {
+ inner_request: ProtoAuthenticateRequest {
+ protocol: "PLAIN".to_string(),
+ token,
+ },
+ }
+ }
+
+ /// Build an authenticate request from a server challenge (for multi-round
auth).
+ pub fn from_challenge(protocol: &str, challenge: Vec<u8>) -> Self {
+ Self {
+ inner_request: ProtoAuthenticateRequest {
+ protocol: protocol.to_string(),
+ token: challenge,
+ },
+ }
+ }
+}
+
+impl RequestBody for AuthenticateRequest {
+ type ResponseBody = AuthenticateResponse;
+ const API_KEY: ApiKey = ApiKey::Authenticate;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(AuthenticateRequest);
+impl_read_version_type!(AuthenticateResponse);
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_new_plain_token_format() {
+ let req = AuthenticateRequest::new_plain("admin", "secret");
+ assert_eq!(req.inner_request.protocol, "PLAIN");
+ assert_eq!(req.inner_request.token, b"\0admin\0secret");
+ }
+
+ #[test]
+ fn test_new_plain_empty_credentials() {
+ let req = AuthenticateRequest::new_plain("", "");
+ assert_eq!(req.inner_request.token, b"\0\0");
+ }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index addb97a..9ad4545 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -20,6 +20,7 @@ use crate::rpc::api_version::ApiVersion;
use crate::rpc::frame::{ReadError, WriteError};
use bytes::{Buf, BufMut};
+mod authenticate;
mod create_database;
mod create_partition;
mod create_table;
@@ -44,6 +45,7 @@ mod table_exists;
mod update_metadata;
pub use crate::rpc::RpcError;
+pub use authenticate::*;
pub use create_database::*;
pub use create_partition::*;
pub use create_table::*;
diff --git a/crates/fluss/src/rpc/server_connection.rs
b/crates/fluss/src/rpc/server_connection.rs
index a345c2f..13c5d9c 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -29,6 +29,7 @@ use futures::future::BoxFuture;
use log::warn;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
+use std::fmt;
use std::io::Cursor;
use std::ops::DerefMut;
use std::sync::Arc;
@@ -44,12 +45,34 @@ pub type MessengerTransport =
ServerConnectionInner<BufStream<Transport>>;
pub type ServerConnection = Arc<MessengerTransport>;
+// Matches Java's ExponentialBackoff(100ms initial, 2x multiplier, 5000ms max,
0.2 jitter).
+const AUTH_INITIAL_BACKOFF_MS: f64 = 100.0;
+const AUTH_MAX_BACKOFF_MS: f64 = 5000.0;
+const AUTH_BACKOFF_MULTIPLIER: f64 = 2.0;
+const AUTH_JITTER: f64 = 0.2;
+
+#[derive(Clone)]
+pub struct SaslConfig {
+ pub username: String,
+ pub password: String,
+}
+
+impl fmt::Debug for SaslConfig {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SaslConfig")
+ .field("username", &self.username)
+ .field("password", &"[REDACTED]")
+ .finish()
+ }
+}
+
#[derive(Debug, Default)]
pub struct RpcClient {
connections: RwLock<HashMap<String, ServerConnection>>,
client_id: Arc<str>,
timeout: Option<Duration>,
max_message_size: usize,
+ sasl_config: Option<SaslConfig>,
}
impl RpcClient {
@@ -59,13 +82,24 @@ impl RpcClient {
client_id: Arc::from(""),
timeout: None,
max_message_size: usize::MAX,
+ sasl_config: None,
}
}
+ pub fn with_timeout(mut self, timeout: Duration) -> Self {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ pub fn with_sasl(mut self, username: String, password: String) -> Self {
+ self.sasl_config = Some(SaslConfig { username, password });
+ self
+ }
+
pub async fn get_connection(
&self,
server_node: &ServerNode,
- ) -> Result<ServerConnection, RpcError> {
+ ) -> Result<ServerConnection, Error> {
let server_id = server_node.uid();
{
let connections = self.connections.read();
@@ -89,7 +123,7 @@ impl RpcClient {
Ok(new_server)
}
- async fn connect(&self, server_node: &ServerNode) ->
Result<ServerConnection, RpcError> {
+ async fn connect(&self, server_node: &ServerNode) ->
Result<ServerConnection, Error> {
let url = server_node.url();
let transport = Transport::connect(&url, self.timeout)
.await
@@ -100,7 +134,74 @@ impl RpcClient {
self.max_message_size,
self.client_id.clone(),
);
- Ok(ServerConnection::new(messenger))
+ let connection = ServerConnection::new(messenger);
+
+ if let Some(ref sasl) = self.sasl_config {
+ Self::authenticate(&connection, &sasl.username,
&sasl.password).await?;
+ }
+
+ Ok(connection)
+ }
+
+ /// Perform SASL/PLAIN authentication handshake.
+ ///
+ /// Retries on `RetriableAuthenticateException` with exponential backoff
+ /// (matching Java's unbounded retry behaviour). Non-retriable errors
+ /// (wrong password, unknown user) propagate immediately as
+ /// `Error::FlussAPIError` with the original error code.
+ async fn authenticate(
+ connection: &ServerConnection,
+ username: &str,
+ password: &str,
+ ) -> Result<(), Error> {
+ use crate::rpc::fluss_api_error::FlussError;
+ use crate::rpc::message::AuthenticateRequest;
+ use rand::Rng;
+
+ let initial_request = AuthenticateRequest::new_plain(username,
password);
+ let mut retry_count: u32 = 0;
+
+ loop {
+ let request = initial_request.clone();
+ let result = connection.request(request).await;
+
+ match result {
+ Ok(response) => {
+ // Check for server challenge (multi-round auth).
+ // PLAIN mechanism never sends a challenge, but we handle
it
+ // for protocol correctness matching Java's
handleAuthenticateResponse.
+ if let Some(challenge) = response.challenge {
+ let challenge_req =
AuthenticateRequest::from_challenge("PLAIN", challenge);
+ connection.request(challenge_req).await?;
+ }
+ return Ok(());
+ }
+ Err(Error::FlussAPIError { ref api_error })
+ if FlussError::for_code(api_error.code)
+ == FlussError::RetriableAuthenticateException =>
+ {
+ retry_count += 1;
+ // Cap the exponent like Java's ExponentialBackoff.expMax
so that
+ // jitter still produces a range at steady state instead
of being
+ // clamped to AUTH_MAX_BACKOFF_MS.
+ let exp_max = (AUTH_MAX_BACKOFF_MS /
AUTH_INITIAL_BACKOFF_MS).log2();
+ let exp = ((retry_count as f64) - 1.0).min(exp_max);
+ let term = AUTH_INITIAL_BACKOFF_MS *
AUTH_BACKOFF_MULTIPLIER.powf(exp);
+ let jitter_factor =
+ 1.0 - AUTH_JITTER + rand::rng().random::<f64>() * (2.0
* AUTH_JITTER);
+ let backoff_ms = (term * jitter_factor) as u64;
+ log::warn!(
+ "SASL authentication retriable failure (attempt
{retry_count}), \
+ retrying in {backoff_ms}ms: {}",
+ api_error.message
+ );
+
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
+ }
+ // Server-side auth errors (wrong password, unknown user, etc.)
+ // propagate with their original error code preserved.
+ Err(e) => return Err(e),
+ }
+ }
}
}
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index 3502923..5bbdaf0 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -15,49 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-use crate::integration::fluss_cluster::FlussTestingCluster;
-use parking_lot::RwLock;
-
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
- LazyLock::new(|| Arc::new(RwLock::new(None)));
-
#[cfg(test)]
-#[before_all]
-#[after_all]
mod admin_test {
- use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::FlussTestingCluster;
- use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
+ use crate::integration::utils::get_shared_cluster;
use fluss::error::FlussError;
use fluss::metadata::{
DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat,
PartitionSpec, Schema,
TableDescriptor, TablePath,
};
use std::collections::HashMap;
- use std::sync::Arc;
-
- fn before_all() {
- start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone());
- }
-
- fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- get_cluster(&SHARED_FLUSS_CLUSTER)
- }
-
- fn after_all() {
- stop_cluster(SHARED_FLUSS_CLUSTER.clone());
- }
#[tokio::test]
async fn test_create_database() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("should get admin");
@@ -97,13 +67,11 @@ mod admin_test {
// database shouldn't exist now
assert!(!admin.database_exists(db_name).await.unwrap());
-
- // Note: We don't stop the shared cluster here as it's used by other
tests
}
#[tokio::test]
async fn test_create_table() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
@@ -232,7 +200,7 @@ mod admin_test {
#[tokio::test]
async fn test_partition_apis() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
@@ -371,7 +339,7 @@ mod admin_test {
#[tokio::test]
async fn test_fluss_error_response() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection
.get_admin()
@@ -405,7 +373,7 @@ mod admin_test {
#[tokio::test]
async fn test_error_database_not_exist() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -424,7 +392,7 @@ mod admin_test {
#[tokio::test]
async fn test_error_database_already_exist() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -454,7 +422,7 @@ mod admin_test {
#[tokio::test]
async fn test_error_table_already_exist() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -502,7 +470,7 @@ mod admin_test {
#[tokio::test]
async fn test_error_table_not_exist() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -564,7 +532,7 @@ mod admin_test {
#[tokio::test]
async fn test_error_table_not_partitioned() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs
b/crates/fluss/tests/integration/fluss_cluster.rs
index e4dcad9..a2e9157 100644
--- a/crates/fluss/tests/integration/fluss_cluster.rs
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -25,7 +25,8 @@ use testcontainers::core::ContainerPort;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
-const FLUSS_VERSION: &str = "0.7.0";
+const FLUSS_VERSION: &str = "0.8.0-incubating";
+const FLUSS_IMAGE: &str = "apache/fluss";
pub struct FlussTestingClusterBuilder {
number_of_tablet_servers: i32,
@@ -33,9 +34,20 @@ pub struct FlussTestingClusterBuilder {
cluster_conf: HashMap<String, String>,
testing_name: String,
remote_data_dir: Option<std::path::PathBuf>,
+ sasl_enabled: bool,
+ sasl_users: Vec<(String, String)>,
+ /// Host port for the coordinator server (default 9123).
+ coordinator_host_port: u16,
+ /// Host port for the plaintext (PLAIN_CLIENT) listener.
+ /// When set together with `sasl_enabled`, the cluster exposes two
listeners:
+ /// CLIENT (SASL) on `coordinator_host_port` and PLAIN_CLIENT on this port.
+ plain_client_port: Option<u16>,
+ image: String,
+ image_tag: String,
}
impl FlussTestingClusterBuilder {
+ #[allow(dead_code)]
pub fn new(testing_name: impl Into<String>) -> Self {
Self::new_with_cluster_conf(testing_name.into(), &HashMap::default())
}
@@ -47,6 +59,18 @@ impl FlussTestingClusterBuilder {
self
}
+ /// Enable SASL/PLAIN authentication on the cluster with dual listeners.
+ /// Users are specified as `(username, password)` pairs.
+ /// This automatically configures a PLAIN_CLIENT (plaintext) listener in
addition
+ /// to the CLIENT (SASL) listener, allowing both authenticated and
unauthenticated
+ /// connections on the same cluster.
+ pub fn with_sasl(mut self, users: Vec<(String, String)>) -> Self {
+ self.sasl_enabled = true;
+ self.sasl_users = users;
+ self.plain_client_port = Some(self.coordinator_host_port + 100);
+ self
+ }
+
pub fn new_with_cluster_conf(
testing_name: impl Into<String>,
conf: &HashMap<String, String>,
@@ -68,6 +92,12 @@ impl FlussTestingClusterBuilder {
network: "fluss-cluster-network",
testing_name: testing_name.into(),
remote_data_dir: None,
+ sasl_enabled: false,
+ sasl_users: Vec::new(),
+ coordinator_host_port: 9123,
+ plain_client_port: None,
+ image: FLUSS_IMAGE.to_string(),
+ image_tag: FLUSS_VERSION.to_string(),
}
}
@@ -84,6 +114,43 @@ impl FlussTestingClusterBuilder {
}
pub async fn build(&mut self) -> FlussTestingCluster {
+ // Remove stale containers from previous runs (if any) so we can reuse
names.
+ let stale_containers: Vec<String> =
std::iter::once(self.zookeeper_container_name())
+ .chain(std::iter::once(self.coordinator_server_container_name()))
+ .chain(
+ (0..self.number_of_tablet_servers).map(|id|
self.tablet_server_container_name(id)),
+ )
+ .collect();
+ for name in &stale_containers {
+ let _ = std::process::Command::new("docker")
+ .args(["rm", "-f", name])
+ .output();
+ }
+
+ // Inject SASL server-side configuration into cluster_conf
+ if self.sasl_enabled && !self.sasl_users.is_empty() {
+ self.cluster_conf.insert(
+ "security.protocol.map".to_string(),
+ "CLIENT:sasl".to_string(),
+ );
+ self.cluster_conf.insert(
+ "security.sasl.enabled.mechanisms".to_string(),
+ "plain".to_string(),
+ );
+ // Build JAAS config: user_<name>="<password>" for each user
+ let user_entries: Vec<String> = self
+ .sasl_users
+ .iter()
+ .map(|(u, p)| format!("user_{}=\"{}\"", u, p))
+ .collect();
+ let jaas_config = format!(
+ "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule
required {};",
+ user_entries.join(" ")
+ );
+ self.cluster_conf
+ .insert("security.sasl.plain.jaas.config".to_string(),
jaas_config);
+ }
+
let zookeeper = Arc::new(
GenericImage::new("zookeeper", "3.9.2")
.with_network(self.network)
@@ -103,64 +170,122 @@ impl FlussTestingClusterBuilder {
);
}
+ // When dual listeners are configured, bootstrap_servers points to the
plaintext
+ // listener and sasl_bootstrap_servers points to the SASL listener.
+ let (bootstrap_servers, sasl_bootstrap_servers) =
+ if let Some(plain_port) = self.plain_client_port {
+ (
+ format!("127.0.0.1:{}", plain_port),
+ Some(format!("127.0.0.1:{}", self.coordinator_host_port)),
+ )
+ } else {
+ (format!("127.0.0.1:{}", self.coordinator_host_port), None)
+ };
+
FlussTestingCluster {
zookeeper,
coordinator_server,
tablet_servers,
- bootstrap_servers: "127.0.0.1:9123".to_string(),
+ bootstrap_servers,
+ sasl_bootstrap_servers,
remote_data_dir: self.remote_data_dir.clone(),
+ sasl_users: self.sasl_users.clone(),
+ container_names: stale_containers,
}
}
async fn start_coordinator_server(&mut self) ->
ContainerAsync<GenericImage> {
+ let port = self.coordinator_host_port;
+ let container_name = self.coordinator_server_container_name();
let mut coordinator_confs = HashMap::new();
coordinator_confs.insert(
"zookeeper.address",
format!("{}:2181", self.zookeeper_container_name()),
);
- coordinator_confs.insert(
- "bind.listeners",
- format!(
- "INTERNAL://{}:0, CLIENT://{}:9123",
- self.coordinator_server_container_name(),
- self.coordinator_server_container_name()
- ),
- );
- coordinator_confs.insert(
- "advertised.listeners",
- "CLIENT://localhost:9123".to_string(),
- );
+
+ if let Some(plain_port) = self.plain_client_port {
+ // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext)
+ coordinator_confs.insert(
+ "bind.listeners",
+ format!(
+ "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}",
+ container_name, container_name, port, container_name,
plain_port
+ ),
+ );
+ coordinator_confs.insert(
+ "advertised.listeners",
+ format!(
+ "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}",
+ port, plain_port
+ ),
+ );
+ } else {
+ coordinator_confs.insert(
+ "bind.listeners",
+ format!(
+ "INTERNAL://{}:0, CLIENT://{}:{}",
+ container_name, container_name, port
+ ),
+ );
+ coordinator_confs.insert(
+ "advertised.listeners",
+ format!("CLIENT://localhost:{}", port),
+ );
+ }
+
coordinator_confs.insert("internal.listener.name",
"INTERNAL".to_string());
- GenericImage::new("fluss/fluss", FLUSS_VERSION)
+
+ let mut image = GenericImage::new(&self.image, &self.image_tag)
.with_container_name(self.coordinator_server_container_name())
- .with_mapped_port(9123, ContainerPort::Tcp(9123))
+ .with_mapped_port(port, ContainerPort::Tcp(port))
.with_network(self.network)
.with_cmd(vec!["coordinatorServer"])
.with_env_var(
"FLUSS_PROPERTIES",
self.to_fluss_properties_with(coordinator_confs),
- )
- .start()
- .await
- .unwrap()
+ );
+
+ if let Some(plain_port) = self.plain_client_port {
+ image = image.with_mapped_port(plain_port,
ContainerPort::Tcp(plain_port));
+ }
+
+ image.start().await.unwrap()
}
async fn start_tablet_server(&self, server_id: i32) ->
ContainerAsync<GenericImage> {
+ let port = self.coordinator_host_port;
+ let container_name = self.tablet_server_container_name(server_id);
let mut tablet_server_confs = HashMap::new();
- let bind_listeners = format!(
- "INTERNAL://{}:0, CLIENT://{}:9123",
- self.tablet_server_container_name(server_id),
- self.tablet_server_container_name(server_id),
- );
- let expose_host_port = 9124 + server_id;
- let advertised_listeners = format!("CLIENT://localhost:{}",
expose_host_port);
+ let expose_host_port = (port as i32) + 1 + server_id;
let tablet_server_id = format!("{}", server_id);
+
+ if let Some(plain_port) = self.plain_client_port {
+ // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext)
+ let bind_listeners = format!(
+ "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}",
+ container_name, container_name, port, container_name,
plain_port,
+ );
+ let plain_expose_host_port = (plain_port as i32) + 1 + server_id;
+ let advertised_listeners = format!(
+ "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}",
+ expose_host_port, plain_expose_host_port
+ );
+ tablet_server_confs.insert("bind.listeners", bind_listeners);
+ tablet_server_confs.insert("advertised.listeners",
advertised_listeners);
+ } else {
+ let bind_listeners = format!(
+ "INTERNAL://{}:0, CLIENT://{}:{}",
+ container_name, container_name, port,
+ );
+ let advertised_listeners = format!("CLIENT://localhost:{}",
expose_host_port);
+ tablet_server_confs.insert("bind.listeners", bind_listeners);
+ tablet_server_confs.insert("advertised.listeners",
advertised_listeners);
+ }
+
tablet_server_confs.insert(
"zookeeper.address",
format!("{}:2181", self.zookeeper_container_name()),
);
- tablet_server_confs.insert("bind.listeners", bind_listeners);
- tablet_server_confs.insert("advertised.listeners",
advertised_listeners);
tablet_server_confs.insert("internal.listener.name",
"INTERNAL".to_string());
tablet_server_confs.insert("tablet-server.id", tablet_server_id);
@@ -172,9 +297,9 @@ impl FlussTestingClusterBuilder {
remote_data_dir.to_string_lossy().to_string(),
);
}
- let mut image = GenericImage::new("fluss/fluss", FLUSS_VERSION)
+ let mut image = GenericImage::new(&self.image, &self.image_tag)
.with_cmd(vec!["tabletServer"])
- .with_mapped_port(expose_host_port as u16,
ContainerPort::Tcp(9123))
+ .with_mapped_port(expose_host_port as u16,
ContainerPort::Tcp(port))
.with_network(self.network)
.with_container_name(self.tablet_server_container_name(server_id))
.with_env_var(
@@ -182,6 +307,15 @@ impl FlussTestingClusterBuilder {
self.to_fluss_properties_with(tablet_server_confs),
);
+ // Add port mapping for plaintext listener
+ if let Some(plain_port) = self.plain_client_port {
+ let plain_expose_host_port = (plain_port as i32) + 1 + server_id;
+ image = image.with_mapped_port(
+ plain_expose_host_port as u16,
+ ContainerPort::Tcp(plain_port),
+ );
+ }
+
// Add volume mount if remote_data_dir is provided
if let Some(ref remote_data_dir) = self.remote_data_dir {
use testcontainers::core::Mount;
@@ -210,35 +344,45 @@ impl FlussTestingClusterBuilder {
/// Provides an easy way to launch a Fluss cluster with coordinator and tablet
servers.
#[derive(Clone)]
+#[allow(dead_code)] // Fields held for RAII (keeping Docker containers alive).
pub struct FlussTestingCluster {
zookeeper: Arc<ContainerAsync<GenericImage>>,
coordinator_server: Arc<ContainerAsync<GenericImage>>,
tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
+ /// Bootstrap servers for plaintext connections.
+ /// When dual listeners are configured, this points to the PLAIN_CLIENT
listener.
bootstrap_servers: String,
+ /// Bootstrap servers for SASL connections (only set when dual listeners
are configured).
+ sasl_bootstrap_servers: Option<String>,
remote_data_dir: Option<std::path::PathBuf>,
+ sasl_users: Vec<(String, String)>,
+ container_names: Vec<String>,
}
impl FlussTestingCluster {
- pub async fn stop(&self) {
- for tablet_server in self.tablet_servers.values() {
- tablet_server.stop().await.unwrap()
+ /// Synchronously stops and removes all Docker containers and cleans up the
+ /// remote data directory. Safe to call from non-async contexts (e.g.
atexit).
+ #[allow(dead_code)]
+ pub fn stop(&self) {
+ for name in &self.container_names {
+ let _ = std::process::Command::new("docker")
+ .args(["rm", "-f", name])
+ .output();
}
- self.coordinator_server.stop().await.unwrap();
- self.zookeeper.stop().await.unwrap();
- if let Some(remote_data_dir) = &self.remote_data_dir {
- // Try to clean up the remote data directory, but don't fail if it
can't be deleted.
- // This can happen in CI environments or if Docker containers are
still using the directory.
- // The directory will be cleaned up by the CI system or OS
eventually.
- if let Err(e) = tokio::fs::remove_dir_all(remote_data_dir).await {
- eprintln!(
- "Warning: Failed to delete remote data directory: {:?},
error: {:?}. \
- This is non-fatal and the directory may be cleaned up
later.",
- remote_data_dir, e
- );
- }
+ if let Some(ref dir) = self.remote_data_dir {
+ let _ = std::fs::remove_dir_all(dir);
}
}
+ pub fn sasl_users(&self) -> &[(String, String)] {
+ &self.sasl_users
+ }
+
+ /// Returns the plaintext (non-SASL) bootstrap servers address.
+ pub fn plaintext_bootstrap_servers(&self) -> &str {
+ &self.bootstrap_servers
+ }
+
pub async fn get_fluss_connection(&self) -> FlussConnection {
let config = Config {
writer_acks: "all".to_string(),
@@ -246,6 +390,58 @@ impl FlussTestingCluster {
..Default::default()
};
+ self.connect_with_retry(config).await
+ }
+
+ /// Connect with SASL/PLAIN credentials.
+ /// Uses `sasl_bootstrap_servers` when dual listeners are configured.
+ pub async fn get_fluss_connection_with_sasl(
+ &self,
+ username: &str,
+ password: &str,
+ ) -> FlussConnection {
+ let bootstrap = self
+ .sasl_bootstrap_servers
+ .clone()
+ .unwrap_or_else(|| self.bootstrap_servers.clone());
+ let config = Config {
+ writer_acks: "all".to_string(),
+ bootstrap_servers: bootstrap,
+ security_protocol: "sasl".to_string(),
+ security_sasl_mechanism: "PLAIN".to_string(),
+ security_sasl_username: username.to_string(),
+ security_sasl_password: password.to_string(),
+ ..Default::default()
+ };
+
+ self.connect_with_retry(config).await
+ }
+
+ /// Try to connect with SASL/PLAIN credentials, returning the error on
failure.
+ /// Uses `sasl_bootstrap_servers` when dual listeners are configured.
+ pub async fn try_fluss_connection_with_sasl(
+ &self,
+ username: &str,
+ password: &str,
+ ) -> fluss::error::Result<FlussConnection> {
+ let bootstrap = self
+ .sasl_bootstrap_servers
+ .clone()
+ .unwrap_or_else(|| self.bootstrap_servers.clone());
+ let config = Config {
+ writer_acks: "all".to_string(),
+ bootstrap_servers: bootstrap,
+ security_protocol: "sasl".to_string(),
+ security_sasl_mechanism: "PLAIN".to_string(),
+ security_sasl_username: username.to_string(),
+ security_sasl_password: password.to_string(),
+ ..Default::default()
+ };
+
+ FlussConnection::new(config).await
+ }
+
+ async fn connect_with_retry(&self, config: Config) -> FlussConnection {
// Retry mechanism: retry for up to 1 minute
let max_retries = 60; // 60 retry attempts
let retry_interval = Duration::from_secs(1); // 1 second interval
between retries
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index c101a18..f0e0c57 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -16,42 +16,11 @@
* limitations under the License.
*/
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-use crate::integration::fluss_cluster::FlussTestingCluster;
#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
- LazyLock::new(|| Arc::new(RwLock::new(None)));
-
-#[cfg(test)]
-#[before_all]
-#[after_all]
mod kv_table_test {
- use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::FlussTestingCluster;
- use crate::integration::utils::{
- create_partitions, create_table, get_cluster, start_cluster,
stop_cluster,
- };
+ use crate::integration::utils::{create_partitions, create_table,
get_shared_cluster};
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
- use std::sync::Arc;
-
- fn before_all() {
- start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone());
- }
-
- fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- get_cluster(&SHARED_FLUSS_CLUSTER)
- }
-
- fn after_all() {
- stop_cluster(SHARED_FLUSS_CLUSTER.clone());
- }
fn make_key(id: i32) -> GenericRow<'static> {
let mut row = GenericRow::new(3);
@@ -61,7 +30,7 @@ mod kv_table_test {
#[tokio::test]
async fn upsert_delete_and_lookup() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -200,7 +169,7 @@ mod kv_table_test {
#[tokio::test]
async fn composite_primary_keys() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.unwrap();
@@ -310,7 +279,7 @@ mod kv_table_test {
async fn partial_update() {
use fluss::row::Datum;
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -431,7 +400,7 @@ mod kv_table_test {
#[tokio::test]
async fn partitioned_table_upsert_and_lookup() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -601,7 +570,7 @@ mod kv_table_test {
async fn all_supported_datatypes() {
use fluss::row::{Date, Datum, Decimal, Time, TimestampLtz,
TimestampNtz};
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 779ffdd..4aa88ac 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -16,55 +16,21 @@
* limitations under the License.
*/
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
-
-use crate::integration::fluss_cluster::FlussTestingCluster;
-#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
- LazyLock::new(|| Arc::new(RwLock::new(None)));
-
#[cfg(test)]
-#[before_all]
-#[after_all]
mod table_test {
- use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::FlussTestingCluster;
- use crate::integration::utils::{
- create_partitions, create_table, get_cluster, start_cluster,
stop_cluster,
- };
+ use crate::integration::utils::{create_partitions, create_table,
get_shared_cluster};
use arrow::array::record_batch;
use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan};
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::record::ScanRecord;
use fluss::row::InternalRow;
use fluss::rpc::message::OffsetSpec;
- use jiff::Timestamp;
use std::collections::HashMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicUsize;
- use std::sync::atomic::Ordering;
use std::time::Duration;
- fn before_all() {
- start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone());
- }
-
- fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- get_cluster(&SHARED_FLUSS_CLUSTER)
- }
-
- fn after_all() {
- stop_cluster(SHARED_FLUSS_CLUSTER.clone());
- }
-
#[tokio::test]
async fn append_record_batch_and_scan() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -174,7 +140,7 @@ mod table_test {
#[tokio::test]
async fn list_offsets() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -221,8 +187,6 @@ mod table_test {
"Latest offset should be 0 for empty table"
);
- let before_append_ms = Timestamp::now().as_millisecond();
-
// Append some records
let append_writer = connection
.get_table(&table_path)
@@ -247,8 +211,6 @@ mod table_test {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
- let after_append_ms = Timestamp::now().as_millisecond();
-
// Test latest offset after appending (should be 3)
let latest_offsets_after = admin
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
@@ -273,34 +235,65 @@ mod table_test {
"Earliest offset should still be 0"
);
- // Test list_offsets_by_timestamp
+ // Scan records back to get server-assigned timestamps (avoids
host/container
+ // clock skew issues that make host-based timestamps unreliable).
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+ let log_scanner = table
+ .new_scan()
+ .create_log_scanner()
+ .expect("Failed to create log scanner");
+ log_scanner
+ .subscribe(0, EARLIEST_OFFSET)
+ .await
+ .expect("Failed to subscribe");
+
+ let mut record_timestamps: Vec<i64> = Vec::new();
+ let scan_start = std::time::Instant::now();
+ while record_timestamps.len() < 3 && scan_start.elapsed() <
Duration::from_secs(10) {
+ let scan_records = log_scanner
+ .poll(Duration::from_millis(500))
+ .await
+ .expect("Failed to poll records");
+ for rec in scan_records {
+ record_timestamps.push(rec.timestamp());
+ }
+ }
+ assert_eq!(record_timestamps.len(), 3, "Expected 3 record timestamps");
+
+ let min_ts = *record_timestamps.iter().min().unwrap();
+ let max_ts = *record_timestamps.iter().max().unwrap();
- let timestamp_offsets = admin
- .list_offsets(&table_path, &[0],
OffsetSpec::Timestamp(before_append_ms))
+ // Timestamp before all records should resolve to offset 0
+ let before_offsets = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Timestamp(min_ts - 1))
.await
- .expect("Failed to list offsets by timestamp");
+ .expect("Failed to list offsets by timestamp (before)");
assert_eq!(
- timestamp_offsets.get(&0),
+ before_offsets.get(&0),
Some(&0),
- "Timestamp before append should resolve to offset 0 (start of new
data)"
+ "Timestamp before first record should resolve to offset 0"
);
- let timestamp_offsets = admin
- .list_offsets(&table_path, &[0],
OffsetSpec::Timestamp(after_append_ms))
+ // Timestamp after all records should resolve to offset 3
+ let after_offsets = admin
+ .list_offsets(&table_path, &[0], OffsetSpec::Timestamp(max_ts + 1))
.await
- .expect("Failed to list offsets by timestamp");
+ .expect("Failed to list offsets by timestamp (after)");
assert_eq!(
- timestamp_offsets.get(&0),
+ after_offsets.get(&0),
Some(&3),
- "Timestamp after append should resolve to offset 0 (no newer
records)"
+ "Timestamp after last record should resolve to offset 3"
);
}
#[tokio::test]
async fn test_project() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -456,7 +449,7 @@ mod table_test {
#[tokio::test]
async fn test_poll_batches() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -588,7 +581,7 @@ mod table_test {
async fn all_supported_datatypes() {
use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz,
TimestampNtz};
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -1019,7 +1012,7 @@ mod table_test {
#[tokio::test]
async fn partitioned_table_append_scan() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
@@ -1314,7 +1307,7 @@ mod table_test {
#[tokio::test]
async fn undersized_row_returns_error() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
diff --git a/crates/fluss/tests/integration/sasl_auth.rs
b/crates/fluss/tests/integration/sasl_auth.rs
new file mode 100644
index 0000000..878c983
--- /dev/null
+++ b/crates/fluss/tests/integration/sasl_auth.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod sasl_auth_test {
+ use crate::integration::utils::get_shared_cluster;
+ use fluss::client::FlussConnection;
+ use fluss::config::Config;
+ use fluss::error::FlussError;
+ use fluss::metadata::DatabaseDescriptorBuilder;
+
+ const SASL_USERNAME: &str = "admin";
+ const SASL_PASSWORD: &str = "admin-secret";
+
+ /// Verify that a client with correct SASL credentials can connect and
perform operations.
+ #[tokio::test]
+ async fn test_sasl_connect_with_valid_credentials() {
+ let cluster = get_shared_cluster();
+ let connection = cluster
+ .get_fluss_connection_with_sasl(SASL_USERNAME, SASL_PASSWORD)
+ .await;
+
+ let admin = connection
+ .get_admin()
+ .await
+ .expect("Should get admin with valid SASL credentials");
+
+ // Perform a basic operation to confirm the connection is fully
functional
+ let db_name = "sasl_test_valid_db";
+ let descriptor = DatabaseDescriptorBuilder::default()
+ .comment("created via SASL auth")
+ .build();
+
+ admin
+ .create_database(db_name, Some(&descriptor), true)
+ .await
+ .expect("Should create database with SASL auth");
+
+ assert!(admin.database_exists(db_name).await.unwrap());
+
+ // Cleanup
+ admin
+ .drop_database(db_name, true, true)
+ .await
+ .expect("Should drop database");
+ }
+
+ /// Verify that a second user can also authenticate successfully.
+ #[tokio::test]
+ async fn test_sasl_connect_with_second_user() {
+ let cluster = get_shared_cluster();
+ let connection = cluster
+ .get_fluss_connection_with_sasl("alice", "alice-secret")
+ .await;
+
+ let admin = connection
+ .get_admin()
+ .await
+ .expect("Should get admin with alice credentials");
+
+ // Basic operation to confirm functional connection
+ assert!(
+ admin
+ .database_exists("some_nonexistent_db_alice")
+ .await
+ .is_ok()
+ );
+ }
+
+ /// Verify that wrong credentials are rejected with a typed
AuthenticateException error.
+ #[tokio::test]
+ async fn test_sasl_connect_with_wrong_password() {
+ let cluster = get_shared_cluster();
+ let result = cluster
+ .try_fluss_connection_with_sasl(SASL_USERNAME, "wrong-password")
+ .await;
+
+ let err = match result {
+ Err(e) => e,
+ Ok(_) => panic!("Connection with wrong password should fail"),
+ };
+
+ // The server error code must be preserved (not wrapped in a generic
string).
+ // Code 46 = AuthenticateException — this is what C++ and Python
bindings
+ // use to distinguish auth failures from network errors.
+ assert_eq!(
+ err.api_error(),
+ Some(FlussError::AuthenticateException),
+ "Wrong password should produce AuthenticateException, got: {err}"
+ );
+ }
+
+ /// Verify that a SASL-configured client fails when connecting to a
plaintext server.
+ #[tokio::test]
+ async fn test_sasl_client_to_plaintext_server() {
+ let cluster = get_shared_cluster();
+ let plaintext_addr = cluster.plaintext_bootstrap_servers().to_string();
+
+ let config = Config {
+ writer_acks: "all".to_string(),
+ bootstrap_servers: plaintext_addr,
+ security_protocol: "sasl".to_string(),
+ security_sasl_mechanism: "PLAIN".to_string(),
+ security_sasl_username: SASL_USERNAME.to_string(),
+ security_sasl_password: SASL_PASSWORD.to_string(),
+ ..Default::default()
+ };
+
+ let result = FlussConnection::new(config).await;
+ assert!(
+ result.is_err(),
+ "SASL client connecting to plaintext server should fail"
+ );
+ }
+
+ /// Verify that a nonexistent user is rejected with a typed error.
+ #[tokio::test]
+ async fn test_sasl_connect_with_unknown_user() {
+ let cluster = get_shared_cluster();
+ let result = cluster
+ .try_fluss_connection_with_sasl("nonexistent_user",
"some-password")
+ .await;
+
+ let err = match result {
+ Err(e) => e,
+ Ok(_) => panic!("Connection with unknown user should fail"),
+ };
+
+ assert_eq!(
+ err.api_error(),
+ Some(FlussError::AuthenticateException),
+ "Unknown user should produce AuthenticateException, got: {err}"
+ );
+ }
+}
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index fcd6773..52b8974 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -15,107 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use crate::integration::fluss_cluster::FlussTestingCluster;
-use parking_lot::RwLock;
-use std::sync::Arc;
-use std::sync::LazyLock;
#[cfg(test)]
-use test_env_helpers::*;
-
-// Module-level shared cluster instance (only for this test file)
-static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
- LazyLock::new(|| Arc::new(RwLock::new(None)));
-
-#[cfg(test)]
-#[before_all]
-#[after_all]
mod table_remote_scan_test {
- use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
- use crate::integration::utils::{
- create_table, get_cluster, stop_cluster, wait_for_cluster_ready,
- };
+ use crate::integration::utils::{create_table, get_shared_cluster};
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
- use std::collections::HashMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicUsize;
- use std::thread;
use std::time::Duration;
- use uuid::Uuid;
- fn before_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
- thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- // Create a temporary directory for remote data that can be
accessed from both
- // container and host. Use a fixed path so it's the same in
container and host.
- // On macOS, Docker Desktop may have issues with /tmp, so we
use a path in the
- // current working directory or user's home directory which
Docker can access.
- let temp_dir = std::env::current_dir()
- .unwrap_or_else(|_| std::path::PathBuf::from("."))
- .join("target")
- .join(format!("test-remote-data-{}", Uuid::new_v4()));
-
- // Remove existing directory if it exists to start fresh
- let _ = std::fs::remove_dir_all(&temp_dir);
- std::fs::create_dir_all(&temp_dir)
- .expect("Failed to create temporary directory for remote
data");
- println!("temp_dir: {:?}", temp_dir);
-
- // Verify directory was created and is accessible
- if !temp_dir.exists() {
- panic!("Remote data directory was not created: {:?}",
temp_dir);
- }
-
- // Get absolute path for Docker mount
- let temp_dir = temp_dir
- .canonicalize()
- .expect("Failed to canonicalize remote data directory
path");
-
- let mut cluster_conf = HashMap::new();
- // set to a small size to make data can be tiered to remote
- cluster_conf.insert("log.segment.file-size".to_string(),
"120b".to_string());
- cluster_conf.insert(
- "remote.log.task-interval-duration".to_string(),
- "1s".to_string(),
- );
- // remote.data.dir uses the same path in container and host
- cluster_conf.insert(
- "remote.data.dir".to_string(),
- temp_dir.to_string_lossy().to_string(),
- );
-
- let cluster =
FlussTestingClusterBuilder::new_with_cluster_conf(
- "test_table_remote",
- &cluster_conf,
- )
- .with_remote_data_dir(temp_dir)
- .build()
- .await;
- wait_for_cluster_ready(&cluster).await;
- let mut guard = cluster_lock.write();
- *guard = Some(cluster);
- });
- })
- .join()
- .expect("Failed to create cluster");
- }
-
- fn after_all() {
- stop_cluster(SHARED_FLUSS_CLUSTER.clone());
- }
#[tokio::test]
async fn test_scan_remote_log() {
- let cluster = get_fluss_cluster();
+ let cluster = get_shared_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss",
"test_append_record_batch_and_scan");
+ let table_path = TablePath::new("fluss", "test_scan_remote_log");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -206,8 +121,4 @@ mod table_remote_scan_test {
);
}
}
-
- fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- get_cluster(&SHARED_FLUSS_CLUSTER)
- }
}
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
index ae61d3a..b53abc8 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -18,20 +18,97 @@
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use fluss::client::FlussAdmin;
use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
-use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
+use std::sync::LazyLock;
use std::time::Duration;
-/// Polls the cluster until CoordinatorEventProcessor is initialized and
tablet server is available.
-/// Times out after 20 seconds.
-pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
- let timeout = Duration::from_secs(20);
+extern "C" fn cleanup_on_exit() {
+ SHARED_CLUSTER.stop();
+}
+
+/// Shared cluster with dual listeners: PLAIN_CLIENT (plaintext) on port 9223
+/// and CLIENT (SASL) on port 9123. Includes remote storage config so
+/// table_remote_scan can also use this cluster.
+static SHARED_CLUSTER: LazyLock<FlussTestingCluster> = LazyLock::new(|| {
+ std::thread::spawn(|| {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let temp_dir = std::env::current_dir()
+ .unwrap_or_else(|_| std::path::PathBuf::from("."))
+ .join("target")
+ .join(format!("test-remote-data-{}", uuid::Uuid::new_v4()));
+ let _ = std::fs::remove_dir_all(&temp_dir);
+ std::fs::create_dir_all(&temp_dir)
+ .expect("Failed to create temporary directory for remote
data");
+ let temp_dir = temp_dir
+ .canonicalize()
+ .expect("Failed to canonicalize remote data directory path");
+
+ let mut cluster_conf = HashMap::new();
+ cluster_conf.insert("log.segment.file-size".to_string(),
"120b".to_string());
+ cluster_conf.insert(
+ "remote.log.task-interval-duration".to_string(),
+ "1s".to_string(),
+ );
+
+ let cluster =
+
FlussTestingClusterBuilder::new_with_cluster_conf("shared-test", &cluster_conf)
+ .with_sasl(vec![
+ ("admin".to_string(), "admin-secret".to_string()),
+ ("alice".to_string(), "alice-secret".to_string()),
+ ])
+ .with_remote_data_dir(temp_dir)
+ .build()
+ .await;
+ wait_for_cluster_ready_with_sasl(&cluster).await;
+
+ // Register cleanup so containers are removed on process exit.
+ unsafe {
+ unsafe extern "C" {
+ fn atexit(f: extern "C" fn()) -> std::os::raw::c_int;
+ }
+ atexit(cleanup_on_exit);
+ }
+
+ cluster
+ })
+ })
+ .join()
+ .expect("Failed to initialize shared cluster")
+});
+
+/// Returns an `Arc` to the shared test cluster.
+pub fn get_shared_cluster() -> Arc<FlussTestingCluster> {
+ Arc::new(SHARED_CLUSTER.clone())
+}
+
+pub async fn create_table(
+ admin: &FlussAdmin,
+ table_path: &TablePath,
+ table_descriptor: &TableDescriptor,
+) {
+ admin
+ .create_table(table_path, table_descriptor, false)
+ .await
+ .expect("Failed to create table");
+}
+
+/// Similar to wait_for_cluster_ready but connects with SASL credentials.
+pub async fn wait_for_cluster_ready_with_sasl(cluster: &FlussTestingCluster) {
+ let timeout = Duration::from_secs(30);
let poll_interval = Duration::from_millis(500);
let start = std::time::Instant::now();
+ let (username, password) = cluster
+ .sasl_users()
+ .first()
+ .expect("SASL cluster must have at least one user");
+
loop {
- let connection = cluster.get_fluss_connection().await;
+ let connection = cluster
+ .get_fluss_connection_with_sasl(username, password)
+ .await;
if connection.get_admin().await.is_ok()
&& connection
.get_metadata()
@@ -44,7 +121,7 @@ pub async fn wait_for_cluster_ready(cluster:
&FlussTestingCluster) {
if start.elapsed() >= timeout {
panic!(
- "Server readiness check timed out after {} seconds. \
+ "SASL server readiness check timed out after {} seconds. \
CoordinatorEventProcessor may not be initialized or
TabletServer may not be available.",
timeout.as_secs()
);
@@ -54,56 +131,6 @@ pub async fn wait_for_cluster_ready(cluster:
&FlussTestingCluster) {
}
}
-pub async fn create_table(
- admin: &FlussAdmin,
- table_path: &TablePath,
- table_descriptor: &TableDescriptor,
-) {
- admin
- .create_table(&table_path, &table_descriptor, false)
- .await
- .expect("Failed to create table");
-}
-
-pub fn start_cluster(name: &str, cluster_lock:
Arc<RwLock<Option<FlussTestingCluster>>>) {
- let name = name.to_string();
- std::thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let cluster = FlussTestingClusterBuilder::new(&name).build().await;
- wait_for_cluster_ready(&cluster).await;
- let mut guard = cluster_lock.write();
- *guard = Some(cluster);
- });
- })
- .join()
- .expect("Failed to create cluster");
-}
-
-pub fn stop_cluster(cluster_lock: Arc<RwLock<Option<FlussTestingCluster>>>) {
- std::thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let mut guard = cluster_lock.write();
- if let Some(cluster) = guard.take() {
- cluster.stop().await;
- }
- });
- })
- .join()
- .expect("Failed to cleanup cluster");
-}
-
-pub fn get_cluster(cluster_lock: &RwLock<Option<FlussTestingCluster>>) ->
Arc<FlussTestingCluster> {
- let guard = cluster_lock.read();
- Arc::new(
- guard
- .as_ref()
- .expect("Fluss cluster not initialized. Make sure before_all() was
called.")
- .clone(),
- )
-}
-
/// Creates partitions for a partitioned table.
///
/// # Arguments
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index a6cc27a..9675646 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -24,6 +24,7 @@ mod integration {
mod fluss_cluster;
mod kv_table;
mod log_table;
+ mod sasl_auth;
mod utils;
diff --git a/website/docs/user-guide/cpp/error-handling.md
b/website/docs/user-guide/cpp/error-handling.md
index 76b03e3..3ded0c2 100644
--- a/website/docs/user-guide/cpp/error-handling.md
+++ b/website/docs/user-guide/cpp/error-handling.md
@@ -94,6 +94,7 @@ if (!result.Ok()) {
| `ErrorCode::PARTITION_ALREADY_EXISTS` | 42 | Partition already
exists |
| `ErrorCode::PARTITION_SPEC_INVALID_EXCEPTION` | 43 | Invalid partition
spec |
| `ErrorCode::LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader available
for partition |
+| `ErrorCode::AUTHENTICATE_EXCEPTION` | 46 | Authentication failed
(bad credentials) |
See `fluss::ErrorCode` in `fluss.hpp` for the full list of named constants.
@@ -147,6 +148,26 @@ if (!result.Ok()) {
}
```
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist:
+
+```cpp
+fluss::Configuration config;
+config.bootstrap_servers = "127.0.0.1:9123";
+config.security_protocol = "sasl";
+config.security_sasl_username = "admin";
+config.security_sasl_password = "wrong-password";
+
+fluss::Connection conn;
+fluss::Result result = fluss::Connection::Create(config, conn);
+if (!result.Ok()) {
+ if (result.error_code == fluss::ErrorCode::AUTHENTICATE_EXCEPTION) {
+ std::cerr << "Authentication failed: " << result.error_message <<
std::endl;
+ }
+}
+```
+
### Schema Mismatch
Using incorrect types or column indices when writing:
diff --git a/website/docs/user-guide/cpp/example/configuration.md
b/website/docs/user-guide/cpp/example/configuration.md
index 2245ee1..f4b6309 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -25,15 +25,32 @@ All fields have sensible defaults. Only `bootstrap_servers`
typically needs to b
```cpp
fluss::Configuration config;
-config.bootstrap_servers = "127.0.0.1:9123"; // Coordinator address
-config.writer_request_max_size = 10 * 1024 * 1024; // Max request size (10
MB)
-config.writer_acks = "all"; // Wait for all replicas
-config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on
failure
-config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB)
-config.writer_batch_timeout_ms = 100; // Max time to wait for a
batch to fill
-config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
-config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
-config.remote_file_download_thread_num = 3; // Download threads
-config.scanner_remote_log_read_concurrency = 4; // In-file remote log read
concurrency
-config.scanner_log_max_poll_records = 500; // Max records returned per
poll()
+config.bootstrap_servers = "127.0.0.1:9123"; // Coordinator
address
+config.writer_request_max_size = 10 * 1024 * 1024; // Max request
size (10 MB)
+config.writer_acks = "all"; // Wait for all
replicas
+config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on
failure
+config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2
MB)
+config.writer_batch_timeout_ms = 100; // Max time to
wait for a batch to fill
+config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or
"round_robin"
+config.scanner_remote_log_prefetch_num = 4; // Remote log
prefetch count
+config.remote_file_download_thread_num = 3; // Download
threads
+config.scanner_remote_log_read_concurrency = 4; // In-file
remote log read concurrency
+config.scanner_log_max_poll_records = 500; // Max records
per poll
+config.connect_timeout_ms = 120000; // TCP connect
timeout (ms)
+```
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```cpp
+fluss::Configuration config;
+config.bootstrap_servers = "127.0.0.1:9123";
+config.security_protocol = "sasl";
+config.security_sasl_mechanism = "PLAIN";
+config.security_sasl_username = "admin";
+config.security_sasl_password = "admin-secret";
+
+fluss::Connection conn;
+fluss::Result result = fluss::Connection::Create(config, conn);
```
diff --git a/website/docs/user-guide/python/error-handling.md
b/website/docs/user-guide/python/error-handling.md
index 9fa4821..50a9e46 100644
--- a/website/docs/user-guide/python/error-handling.md
+++ b/website/docs/user-guide/python/error-handling.md
@@ -53,6 +53,7 @@ except fluss.FlussError as e:
| `ErrorCode.PARTITION_ALREADY_EXISTS` | 42 | Partition already
exists |
| `ErrorCode.PARTITION_SPEC_INVALID_EXCEPTION` | 43 | Invalid partition spec
|
| `ErrorCode.LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader available
for partition |
+| `ErrorCode.AUTHENTICATE_EXCEPTION` | 46 | Authentication failed
(bad credentials) |
See `fluss.ErrorCode` for the full list of named constants.
@@ -95,6 +96,24 @@ except fluss.FlussError as e:
print("Partition does not exist, create it first")
```
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist.
+
+```python
+try:
+ config = fluss.Config({
+ "bootstrap.servers": "127.0.0.1:9123",
+ "client.security.protocol": "sasl",
+ "client.security.sasl.username": "admin",
+ "client.security.sasl.password": "wrong-password",
+ })
+ conn = await fluss.FlussConnection.create(config)
+except fluss.FlussError as e:
+ if e.error_code == fluss.ErrorCode.AUTHENTICATE_EXCEPTION:
+ print(f"Authentication failed: {e.message}")
+```
+
### Schema Mismatch
Row data doesn't match the table schema.
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index 39c53be..90b1249 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -28,12 +28,34 @@ with await fluss.FlussConnection.create(config) as conn:
| `writer.acks` | Acknowledgment setting (`all` waits
for all replicas) | `all` |
| `writer.retries` | Number of retries on failure
| `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
-| `writer.batch-timeout-ms` | The maximum time to wait for a
writer batch to fill up before sending. | `100` |
-| `writer.bucket.no-key-assigner` | Bucket assignment strategy for
tables without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `writer.batch-timeout-ms` | The maximum time to wait for a
writer batch to fill up before sending. | `100` |
+| `writer.bucket.no-key-assigner` | Bucket assignment strategy for
tables without bucket keys: `sticky` or `round_robin` | `sticky` |
| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a
remote log file | `4` |
| `scanner.log.max-poll-records` | Max records returned in a single
poll() | `500` |
+| `connect-timeout` | TCP connect timeout in milliseconds
| `120000` |
+| `security.protocol` | `PLAINTEXT` (default) or `sasl` for
SASL auth | `PLAINTEXT` |
+| `security.sasl.mechanism` | SASL mechanism (only `PLAIN` is
supported) | `PLAIN` |
+| `security.sasl.username` | SASL username (required when
protocol is `sasl`) | (empty) |
+| `security.sasl.password` | SASL password (required when
protocol is `sasl`) | (empty) |
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```python
+config = fluss.Config({
+ "bootstrap.servers": "127.0.0.1:9123",
+ "security.protocol": "sasl",
+ "security.sasl.mechanism": "PLAIN",
+ "security.sasl.username": "admin",
+ "security.sasl.password": "admin-secret",
+})
+conn = await fluss.FlussConnection.create(config)
+```
+
+## Connection Lifecycle
Remember to close the connection when done:
diff --git a/website/docs/user-guide/rust/error-handling.md
b/website/docs/user-guide/rust/error-handling.md
index 35ede6c..964f81f 100644
--- a/website/docs/user-guide/rust/error-handling.md
+++ b/website/docs/user-guide/rust/error-handling.md
@@ -71,6 +71,9 @@ match result {
Err(ref e) if e.api_error() ==
Some(FlussError::LeaderNotAvailableException) => {
eprintln!("Leader not available: {}", e);
}
+ Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
+ eprintln!("Authentication failed: {}", e);
+ }
_ => {}
}
```
@@ -133,6 +136,22 @@ match result {
}
```
+### Authentication Failed
+
+SASL credentials are incorrect or the user does not exist.
+
+```rust
+use fluss::error::{Error, FlussError};
+
+let result = FlussConnection::new(config).await;
+match result {
+ Err(ref e) if e.api_error() == Some(FlussError::AuthenticateException) => {
+ eprintln!("Authentication failed: {}", e);
+ }
+ _ => {}
+}
+```
+
### Schema Mismatch
Row data does not match the expected table schema.
diff --git a/website/docs/user-guide/rust/example/configuration.md
b/website/docs/user-guide/rust/example/configuration.md
index a2f52dc..f6340c9 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -17,16 +17,36 @@ let conn = FlussConnection::new(config).await?;
## Connection Configurations
-| Option | Description
| Default |
-|---------------------------------|--------------------------------------------------------------------------------------|------------------|
-| `bootstrap_servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `writer_request_max_size` | Maximum request size in bytes
| 10 MB |
-| `writer_acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
-| `writer_retries` | Number of retries on failure
| `i32::MAX` |
-| `writer_batch_size` | Batch size for writes
| 2 MB |
-| `writer_batch_timeout_ms` | The maximum time to wait for a writer
batch to fill up before sending. | `100` |
-| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
-| `scanner_remote_log_prefetch_num` | Number of remote log segments to
prefetch | `4` |
-| `remote_file_download_thread_num` | Number of concurrent remote log file
downloads | `3` |
-| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a
remote log file | `4` |
-| `scanner_log_max_poll_records` | Maximum records returned in a single
`poll()` | `500` |
+| Option | Description
| Default |
+|---------------------------------------|--------------------------------------------------------------------------------------|------------------|
+| `bootstrap_servers` | Coordinator server address
| `127.0.0.1:9123` |
+| `writer_request_max_size` | Maximum request size in bytes
| 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits
for all replicas) | `all` |
+| `writer_retries` | Number of retries on failure
| `i32::MAX` |
+| `writer_batch_size` | Batch size for writes
| 2 MB |
+| `writer_batch_timeout_ms` | The maximum time to wait for a
writer batch to fill up before sending. | `100` |
+| `writer_bucket_no_key_assigner` | Bucket assignment strategy for
tables without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `scanner_remote_log_prefetch_num` | Number of remote log segments to
prefetch | `4` |
+| `remote_file_download_thread_num` | Number of concurrent remote log file
downloads | `3` |
+| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a
remote log file | `4` |
+| `scanner_log_max_poll_records` | Maximum records returned in a single
`poll()` | `500` |
+| `connect_timeout_ms` | TCP connect timeout in milliseconds
| 120000 |
+| `security_protocol` | `PLAINTEXT` (default) or `sasl` for
SASL auth | `PLAINTEXT` |
+| `security_sasl_mechanism` | SASL mechanism (only `PLAIN` is
supported) | `PLAIN` |
+| `security_sasl_username` | SASL username (required when
protocol is `sasl`) | (empty) |
+| `security_sasl_password` | SASL password (required when
protocol is `sasl`) | (empty) |
+
+## SASL Authentication
+
+To connect to a Fluss cluster with SASL/PLAIN authentication enabled:
+
+```rust
+let mut config = Config::default();
+config.bootstrap_servers = "127.0.0.1:9123".to_string();
+config.security_protocol = "sasl".to_string();
+config.security_sasl_mechanism = "PLAIN".to_string();
+config.security_sasl_username = "admin".to_string();
+config.security_sasl_password = "admin-secret".to_string();
+
+let conn = FlussConnection::new(config).await?;
+```