Copilot commented on code in PR #3138: URL: https://github.com/apache/brpc/pull/3138#discussion_r2501501962
########## src/brpc/couchbase.h: ########## @@ -0,0 +1,538 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_COUCHBASE_H +#define BRPC_COUCHBASE_H + +#endif + +#include <brpc/channel.h> + +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <queue> +#include <string> +#include <unordered_map> + +#include "brpc/nonreflectable_message.h" +#include "brpc/pb_compat.h" +#include "butil/iobuf.h" +#include "butil/strings/string_piece.h" +using namespace std; Review Comment: `using namespace std;` is a bad practice in header files as it pollutes the global namespace for all files that include this header. Consider using explicit `std::` prefixes instead or placing specific using declarations inside the `brpc` namespace. ```suggestion ``` ########## example/couchbase_c++/multithreaded_couchbase_client.cpp: ########## @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <brpc/couchbase.h> +#include <bthread/bthread.h> +#include <butil/logging.h> +#include <butil/string_printf.h> + +#include <atomic> +#include <iostream> +#include <string> +#include <vector> + +// ANSI color codes +#define GREEN "\033[32m" +#define RED "\033[31m" +#define BLUE "\033[34m" +#define YELLOW "\033[33m" +#define CYAN "\033[36m" +#define RESET "\033[0m" + +const int NUM_THREADS = 20; +const int THREADS_PER_BUCKET = 5; + +// Simple global config +struct { + std::string username = "Administrator"; + std::string password = "password"; + std::vector<std::string> bucket_names = {"t0", "t1", "t2", "t3"}; +} g_config; + +// Simple thread statistics +struct ThreadStats { + std::atomic<int> operations_attempted{0}; + std::atomic<int> operations_successful{0}; + std::atomic<int> operations_failed{0}; + + void reset() { + operations_attempted = 0; + operations_successful = 0; + operations_failed = 0; + } +}; + +// Global statistics +struct GlobalStats { + ThreadStats total; + std::vector<ThreadStats> per_thread_stats; + + GlobalStats() : per_thread_stats(NUM_THREADS) {} + + void aggregate_stats() { + total.reset(); + for (const auto& stats : per_thread_stats) { + total.operations_attempted += stats.operations_attempted.load(); + total.operations_successful += stats.operations_successful.load(); + total.operations_failed += stats.operations_failed.load(); + } + } +} g_stats; + +// Simple thread arguments +struct ThreadArgs { + int thread_id; + int bucket_id; + std::string bucket_name; + brpc::CouchbaseOperations* couchbase_ops; + ThreadStats* stats; +}; + +// Simple CRUD operations on default collection +void perform_crud_operations_default(brpc::CouchbaseOperations& couchbase_ops, + const std::string& base_key, + ThreadStats* stats) { + std::string key = base_key + "_default"; + std::string value = butil::string_printf( + R"({"thread_id": %d, "collection": "default"})", (int)bthread_self()); + + stats->operations_attempted++; + + // UPSERT + brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + return; + } + + stats->operations_attempted++; + + // GET + result = couchbase_ops.get(key); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + return; + } + + stats->operations_attempted++; + + // DELETE + result = couchbase_ops.delete_(key); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + } +} + +// Simple CRUD operations on col1 collection +void perform_crud_operations_col1(brpc::CouchbaseOperations& couchbase_ops, + const std::string& base_key, + ThreadStats* stats) { + std::string key = base_key + "_col1"; + std::string value = butil::string_printf( + R"({"thread_id": %d, "collection": "col1"})", (int)bthread_self()); + + stats->operations_attempted++; + + // UPSERT + brpc::CouchbaseOperations::Result result = + couchbase_ops.upsert(key, value, "col1"); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + cout << "UPSERT failed: " << result.error_message << std::endl; Review Comment: The `cout` should be `std::cout`. While there's a `using namespace std;` at the top of the file, it's better to be explicit for consistency with other code in the project. ########## example/couchbase_c++/multithreaded_couchbase_client.cpp: ########## @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <brpc/couchbase.h> +#include <bthread/bthread.h> +#include <butil/logging.h> +#include <butil/string_printf.h> + +#include <atomic> +#include <iostream> +#include <string> +#include <vector> + +// ANSI color codes +#define GREEN "\033[32m" +#define RED "\033[31m" +#define BLUE "\033[34m" +#define YELLOW "\033[33m" +#define CYAN "\033[36m" +#define RESET "\033[0m" + +const int NUM_THREADS = 20; +const int THREADS_PER_BUCKET = 5; + +// Simple global config +struct { + std::string username = "Administrator"; + std::string password = "password"; + std::vector<std::string> bucket_names = {"t0", "t1", "t2", "t3"}; +} g_config; + +// Simple thread statistics +struct ThreadStats { + std::atomic<int> operations_attempted{0}; + std::atomic<int> operations_successful{0}; + std::atomic<int> operations_failed{0}; + + void reset() { + operations_attempted = 0; + operations_successful = 0; + operations_failed = 0; + } +}; + +// Global statistics +struct GlobalStats { + ThreadStats total; + std::vector<ThreadStats> per_thread_stats; + + GlobalStats() : per_thread_stats(NUM_THREADS) {} + + void aggregate_stats() { + total.reset(); + for (const auto& stats : per_thread_stats) { + total.operations_attempted += stats.operations_attempted.load(); + total.operations_successful += stats.operations_successful.load(); + total.operations_failed += stats.operations_failed.load(); + } + } +} g_stats; + +// Simple thread arguments +struct ThreadArgs { + int thread_id; + int bucket_id; + std::string bucket_name; + brpc::CouchbaseOperations* couchbase_ops; + ThreadStats* stats; +}; + +// Simple CRUD operations on default collection +void perform_crud_operations_default(brpc::CouchbaseOperations& couchbase_ops, + const std::string& base_key, + ThreadStats* stats) { + std::string key = base_key + "_default"; + std::string value = butil::string_printf( + R"({"thread_id": %d, "collection": "default"})", (int)bthread_self()); + + stats->operations_attempted++; + + // UPSERT + brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + return; + } + + stats->operations_attempted++; + + // GET + result = couchbase_ops.get(key); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + return; + } + + stats->operations_attempted++; + + // DELETE + result = couchbase_ops.delete_(key); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + } +} + +// Simple CRUD operations on col1 collection +void perform_crud_operations_col1(brpc::CouchbaseOperations& couchbase_ops, + const std::string& base_key, + ThreadStats* stats) { + std::string key = base_key + "_col1"; + std::string value = butil::string_printf( + R"({"thread_id": %d, "collection": "col1"})", (int)bthread_self()); + + stats->operations_attempted++; + + // UPSERT + brpc::CouchbaseOperations::Result result = + couchbase_ops.upsert(key, value, "col1"); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + cout << "UPSERT failed: " << result.error_message << std::endl; + return; + } + + stats->operations_attempted++; + + // GET + result = couchbase_ops.get(key, "col1"); + if (result.success) { + stats->operations_successful++; + } else { + stats->operations_failed++; + cout << "GET failed: " << result.error_message << std::endl; Review Comment: The `cout` should be `std::cout`. While there's a `using namespace std;` at the top of the file, it's better to be explicit for consistency with other code in the project. ########## src/brpc/couchbase.cpp: ########## @@ -0,0 +1,2978 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/couchbase.h" + +#include <zlib.h> //for crc32 Vbucket_id + +// Debug flag for enabling debug statements +static bool DBUG = false; // Set to true to enable debug logs + +// Debug print macro +#define DEBUG_PRINT(msg) \ + do { \ + if (DBUG) { \ + std::cout << "[DEBUG] " << msg << std::endl; \ + } \ + } while (0) + +#include <iostream> + +#include "brpc/policy/couchbase_protocol.h" +#include "brpc/proto_base.pb.h" +#include "butil/logging.h" +#include "butil/macros.h" +#include "butil/string_printf.h" +#include "butil/sys_byteorder.h" +#include "butil/third_party/rapidjson/document.h" +#include "butil/third_party/rapidjson/rapidjson.h" + +namespace brpc { + +// Couchbase protocol constants +namespace { +[[maybe_unused]] constexpr uint32_t APPLE_VBUCKET_COUNT = 64; +constexpr uint32_t DEFAULT_VBUCKET_COUNT = 1024; +constexpr int CONNECTION_ID_SIZE = 33; +constexpr size_t RANDOM_ID_HEX_SIZE = 67; // 33 bytes * 2 + null terminator +} // namespace + +// Static member definitions +CouchbaseManifestManager* + CouchbaseOperations::CouchbaseRequest::metadata_tracking = + &common_metadata_tracking; + +bool brpc::CouchbaseManifestManager::setBucketToCollectionManifest( + string server, string bucket, + CouchbaseManifestManager::CollectionManifest manifest) { + // Then update the collection manifest with proper locking + { + UniqueLock write_lock(rw_bucket_to_collection_manifest_mutex_); + bucket_to_collection_manifest_[server][bucket] = manifest; + } + + return true; +} + +bool brpc::CouchbaseManifestManager::getBucketToCollectionManifest( + string server, string bucket, + CouchbaseManifestManager::CollectionManifest* manifest) { + SharedLock read_lock(rw_bucket_to_collection_manifest_mutex_); + auto it1 = bucket_to_collection_manifest_.find(server); + if (it1 == bucket_to_collection_manifest_.end()) { + return false; + } + auto it2 = it1->second.find(bucket); + if (it2 == it1->second.end()) { + return false; + } + *manifest = it2->second; + return true; +} + +bool brpc::CouchbaseManifestManager::getManifestToCollectionId( + CouchbaseManifestManager::CollectionManifest* manifest, string scope, + string collection, uint8_t* collection_id) { + if (manifest == nullptr || collection_id == nullptr) { + DEBUG_PRINT("Invalid input: manifest or collection_id is null"); + return false; + } + auto it1 = manifest->scope_to_collection_id_map.find(scope); + if (it1 == manifest->scope_to_collection_id_map.end()) { + DEBUG_PRINT("Scope: " << scope << " not found in manifest"); + return false; + } + auto it2 = it1->second.find(collection); + if (it2 == it1->second.end()) { + DEBUG_PRINT("Collection: " << collection + << " not found in scope: " << scope); + return false; + } + *collection_id = it2->second; + return true; +} + +bool CouchbaseManifestManager::jsonToCollectionManifest( + const string& json, + CouchbaseManifestManager::CollectionManifest* manifest) { + if (manifest == nullptr) { + DEBUG_PRINT("Invalid input: manifest is null"); + return false; + } + + // Clear existing data + manifest->uid.clear(); + manifest->scope_to_collection_id_map.clear(); + + if (json.empty()) { + DEBUG_PRINT("JSON string is empty"); + return false; + } + + // Parse JSON using RapidJSON + BUTIL_RAPIDJSON_NAMESPACE::Document document; + document.Parse(json.c_str()); + + if (document.HasParseError()) { + DEBUG_PRINT("Failed to parse JSON: " << document.GetParseError()); + return false; + } + + if (!document.IsObject()) { + DEBUG_PRINT("JSON root is not an object"); + return false; + } + + // Extract uid + if (document.HasMember("uid") && document["uid"].IsString()) { + manifest->uid = document["uid"].GetString(); + } else { + DEBUG_PRINT("Missing or invalid 'uid' field in JSON"); + return false; + } + + // Extract scopes + if (!document.HasMember("scopes") || !document["scopes"].IsArray()) { + DEBUG_PRINT("Missing or invalid 'scopes' field in JSON"); + return false; + } + + const BUTIL_RAPIDJSON_NAMESPACE::Value& scopes = document["scopes"]; + for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < scopes.Size(); ++i) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& scope = scopes[i]; + + if (!scope.IsObject()) { + DEBUG_PRINT("Scope at index " << i << " is not an object"); + return false; + } + + // Extract scope name + if (!scope.HasMember("name") || !scope["name"].IsString()) { + DEBUG_PRINT("Missing or invalid 'name' field in scope at index " << i); + return false; + } + string scope_name = scope["name"].GetString(); + + // Extract collections + if (!scope.HasMember("collections") || !scope["collections"].IsArray()) { + DEBUG_PRINT("Missing or invalid 'collections' field in scope '" + << scope_name << "'"); + return false; + } + + const BUTIL_RAPIDJSON_NAMESPACE::Value& collections = scope["collections"]; + unordered_map<string, uint8_t> collection_map; + + for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < collections.Size(); + ++j) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& collection = collections[j]; + + if (!collection.IsObject()) { + DEBUG_PRINT("Collection at index " << j << " in scope '" << scope_name + << "' is not an object"); + return false; + } + + // Extract collection name + if (!collection.HasMember("name") || !collection["name"].IsString()) { + DEBUG_PRINT("Missing or invalid 'name' field in collection at index " + << j << " in scope '" << scope_name << "'"); + return false; + } + string collection_name = collection["name"].GetString(); + + // Extract collection uid (hex string) + if (!collection.HasMember("uid") || !collection["uid"].IsString()) { + DEBUG_PRINT("Missing or invalid 'uid' field in collection '" + << collection_name << "' in scope '" << scope_name << "'"); + return false; + } + string collection_uid_str = collection["uid"].GetString(); + + // Convert hex string to uint8_t + uint8_t collection_id = 0; + try { + // Convert hex string to integer + unsigned long uid_val = std::stoul(collection_uid_str, nullptr, 16); + if (uid_val > 255) { + DEBUG_PRINT( + "Collection uid '" + << collection_uid_str << "' exceeds uint8_t range in collection '" + << collection_name << "' in scope '" << scope_name << "'"); + return false; + } + collection_id = static_cast<uint8_t>(uid_val); + } catch (const std::exception& e) { + DEBUG_PRINT("Failed to parse collection uid '" + << collection_uid_str << "' as hex in collection '" + << collection_name << "' in scope '" << scope_name << ": " + << e.what()); + return false; + } + + // Add to collection map + collection_map[collection_name] = collection_id; + } + + // Add scope and its collections to manifest + manifest->scope_to_collection_id_map[scope_name] = + std::move(collection_map); + } + + return true; +} + +bool CouchbaseManifestManager::refreshCollectionManifest( + brpc::Channel* channel, const string& server, const string& bucket, + unordered_map<string, CollectionManifest>* local_collection_manifest_cache) { + // first fetch the manifest + // then compare the UID with the cached one + if (channel == nullptr) { + DEBUG_PRINT("No channel found, make sure to call Authenticate() first"); + return false; + } + if (server.empty()) { + DEBUG_PRINT("Server is empty, make sure to call Authenticate() first"); + return false; + } + if (bucket.empty()) { + DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first"); + return false; + } + CouchbaseOperations::CouchbaseRequest temp_get_manifest_request; + CouchbaseOperations::CouchbaseResponse temp_get_manifest_response; + brpc::Controller temp_cntl; + temp_get_manifest_request.getCollectionManifest(); + channel->CallMethod(NULL, &temp_cntl, &temp_get_manifest_request, + &temp_get_manifest_response, NULL); + if (temp_cntl.Failed()) { + DEBUG_PRINT("Failed to get collection manifest: bRPC controller error " + << temp_cntl.ErrorText()); + return false; + } + string manifest_json; + if (!temp_get_manifest_response.popManifest(&manifest_json)) { + DEBUG_PRINT("Failed to parse response for refreshing collection Manifest: " + << temp_get_manifest_response.lastError()); + return false; + } + brpc::CouchbaseManifestManager::CollectionManifest manifest; + if (!common_metadata_tracking.jsonToCollectionManifest(manifest_json, + &manifest)) { + DEBUG_PRINT("Failed to parse collection manifest JSON"); + return false; + } + brpc::CouchbaseManifestManager::CollectionManifest cached_manifest; + if (!common_metadata_tracking.getBucketToCollectionManifest( + server, bucket, &cached_manifest)) { + // No cached manifest found, set the new one + if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket, + manifest)) { + DEBUG_PRINT("Failed to cache collection manifest for bucket " + << bucket << " on server " << server); + return false; + } + DEBUG_PRINT("Cached collection manifest for bucket " + << bucket << " on server " << server); + // also update the local cache + if (local_collection_manifest_cache != nullptr) { + (*local_collection_manifest_cache)[bucket] = manifest; + } + return true; + } + // Compare the UID with the cached one + // If they are different, refresh the cache + else if (manifest.uid != cached_manifest.uid) { + DEBUG_PRINT("Collection manifest has changed for bucket " + << bucket << " on server " << server); + if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket, + manifest)) { + DEBUG_PRINT("Failed to update cached collection manifest for bucket " + << bucket << " on server " << server); + return false; + } + DEBUG_PRINT("Updated cached collection manifest for bucket " + << bucket << " on server " << server); + // update the local cache as well + if (local_collection_manifest_cache != nullptr) { + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Added to local collection manifest cache for bucket " + << bucket << " on server " << server); + } + return true; + } else { + DEBUG_PRINT("Collection manifest is already up-to-date for bucket " + << bucket << " on server " << server); + if (local_collection_manifest_cache != nullptr) { + if (local_collection_manifest_cache->find(bucket) != + local_collection_manifest_cache->end()) { + // if the bucket already exists in the local cache, check the UID + if ((*local_collection_manifest_cache)[bucket].uid != manifest.uid) { + // if the UID is different, update the local cache + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Updated local collection manifest cache for bucket " + << bucket << " on server " << server); + } + } else { + // if the bucket does not exist in the local cache, add it + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Added to local collection manifest cache for bucket " + << bucket << " on server " << server); + } + } + return false; + } +} + +uint32_t CouchbaseOperations::CouchbaseRequest::hashCrc32(const char* key, + size_t key_length) { + static const uint32_t crc32tab[256] = { + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, + 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, + 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, + 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, + 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, + 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, + 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, + 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, + 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, + 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, + 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, + 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, + 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, + 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, + 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, + 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, + 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, + 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, + 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, + 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, + 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, + 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, + 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, + 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, + 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, + 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, + 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, + 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, + 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, + 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, + 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, + 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, + 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, + 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, + 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, + 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, + 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, + 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, + 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d, + }; + + uint64_t x; + uint32_t crc = UINT32_MAX; + + for (x = 0; x < key_length; x++) + crc = (crc >> 8) ^ crc32tab[(crc ^ (uint64_t)key[x]) & 0xff]; + +#ifdef __APPLE__ + return ((~crc) >> 16) % APPLE_VBUCKET_COUNT; +#else + return ((~crc) >> 16) % DEFAULT_VBUCKET_COUNT; +#endif +} + +void CouchbaseOperations::CouchbaseRequest::sharedCtor() { + _pipelined_count = 0; + _cached_size_ = 0; +} + +void CouchbaseOperations::CouchbaseRequest::sharedDtor() {} + +void CouchbaseOperations::CouchbaseRequest::setCachedSize(int size) const { + _cached_size_ = size; +} + +void CouchbaseOperations::CouchbaseRequest::Clear() { + _buf.clear(); + _pipelined_count = 0; +} + +// Support for scope level collections will be added in future. +// Get the Scope ID for a given scope name +// bool CouchbaseOperations::CouchbaseRequest::GetScopeId(const +// butil::StringPiece& scope_name) { +// if (scope_name.empty()) { +// DEBUG_PRINT("Empty scope name"); +// return false; +// } +// // Opcode 0xBC for Get Scope ID (see Collections.md) +// const policy::CouchbaseRequestHeader header = { +// policy::CB_MAGIC_REQUEST, +// policy::CB_GET_SCOPE_ID, +// butil::HostToNet16(scope_name.size()), +// 0, // no extras +// policy::CB_BINARY_RAW_BYTES, +// 0, // no vbucket +// butil::HostToNet32(scope_name.size()), +// 0, // opaque +// 0 // no CAS +// }; +// if (_buf.append(&header, sizeof(header))) { +// return false; +// } +// if (_buf.append(scope_name.data(), scope_name.size())) { +// return false; +// } +// ++_pipelined_count; +// return true; +// } + +bool CouchbaseOperations::CouchbaseRequest::selectBucketRequest( + const butil::StringPiece& bucket_name) { + if (bucket_name.empty()) { + DEBUG_PRINT("Empty bucket name"); + return false; + } + // construct the request header + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_SELECT_BUCKET, + butil::HostToNet16(bucket_name.size()), + 0, + policy::CB_BINARY_RAW_BYTES, + 0, + butil::HostToNet32(bucket_name.size()), + 0, + 0}; + if (_buf.append(&header, sizeof(header))) { + DEBUG_PRINT("Failed to append header to buffer"); + return false; + } + if (_buf.append(bucket_name.data(), bucket_name.size())) { + DEBUG_PRINT("Failed to append bucket name to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +// HelloRequest sends a Hello request to the Couchbase server, which specifies +// the client features and capabilities. +// This is typically the first request sent after connecting to the server. +// It includes the agent name and a randomly generated connection ID in JSON +// format. +bool CouchbaseOperations::CouchbaseRequest::helloRequest() { + std::string agent = "brpc/1.0.0 ("; +#ifdef __APPLE__ + agent += "Darwin/"; +#elif defined(__linux__) + agent += "Linux/"; +#else + agent += "UnknownOS/"; +#endif +#if defined(__x86_64__) + agent += "x86_64"; +#elif defined(__aarch64__) + agent += "arm64"; +#else + agent += "unknown"; +#endif + agent += ";bssl/0x1010107f)"; + + // Generate a random connection ID as hex string + unsigned char raw_id[CONNECTION_ID_SIZE]; + FILE* urandom = fopen("/dev/urandom", "rb"); + if (!urandom || + fread(raw_id, 1, CONNECTION_ID_SIZE, urandom) != CONNECTION_ID_SIZE) { + if (urandom) fclose(urandom); + DEBUG_PRINT("Failed to generate random connection id"); + return false; + } + fclose(urandom); + char hex_id[RANDOM_ID_HEX_SIZE] = {0}; + for (int i = 0; i < CONNECTION_ID_SIZE; ++i) { + sprintf(hex_id + i * 2, "%02x", raw_id[i]); + } + + // Format key as JSON: {"a":"agent","i":"hex_id"} + std::string key = + std::string("{\"a\":\"") + agent + "\",\"i\":\"" + hex_id + "\"}"; + + const uint16_t key_len = key.size(); + uint16_t features[] = { + butil::HostToNet16(0x0001), // Datatype + butil::HostToNet16(0x0006), // XError + butil::HostToNet16(0x0007), // SelectBucket + butil::HostToNet16(0x000b), // Snappy + butil::HostToNet16(0x0012) // Collections + }; + + const uint32_t value_len = sizeof(features); + const uint32_t total_body_len = key_len + value_len; + + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_HELLO_SELECT_FEATURES, + butil::HostToNet16(key_len), // key length + 0, // extras length + policy::CB_BINARY_RAW_BYTES, // data type + 0, // vbucket id + butil::HostToNet32(total_body_len), // total body length + 0, // opaque + 0 // cas value + }; + + if (_buf.append(&header, sizeof(header))) { + DEBUG_PRINT("Failed to append Hello header to buffer"); + return false; + } + if (_buf.append(key.data(), key_len)) { + DEBUG_PRINT("Failed to append Hello JSON key to buffer"); + return false; + } + if (_buf.append(reinterpret_cast<const char*>(features), value_len)) { + DEBUG_PRINT("Failed to append Hello features to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +bool CouchbaseOperations::CouchbaseRequest::authenticateRequest( + const butil::StringPiece& username, const butil::StringPiece& password) { + if (username.empty() || password.empty()) { + DEBUG_PRINT("Empty username or password"); + return false; + } + // insert the features to get enabled, calling function helloRequest() will do + // this. + if (!helloRequest()) { + DEBUG_PRINT("Failed to send helloRequest for authentication"); + return false; + } + // Construct the request header + constexpr char kPlainAuthCommand[] = "PLAIN"; + constexpr char kPadding[1] = {'\0'}; + const brpc::policy::CouchbaseRequestHeader header = { + brpc::policy::CB_MAGIC_REQUEST, + brpc::policy::CB_BINARY_SASL_AUTH, + butil::HostToNet16(sizeof(kPlainAuthCommand) - 1), + 0, + 0, + 0, + butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 + username.length() * 2 + + password.length()), + 0, + 0}; + std::string auth_str; + auth_str.reserve(sizeof(header) + sizeof(kPlainAuthCommand) - 1 + + username.size() * 2 + password.size() + 2); + auth_str.append(reinterpret_cast<const char*>(&header), sizeof(header)); + auth_str.append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1); + auth_str.append(username.data(), username.size()); + auth_str.append(kPadding, sizeof(kPadding)); + auth_str.append(username.data(), username.size()); + auth_str.append(kPadding, sizeof(kPadding)); + auth_str.append(password.data(), password.size()); + if (_buf.append(auth_str.data(), auth_str.size())) { + DEBUG_PRINT("Failed to append auth string to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +void CouchbaseOperations::CouchbaseRequest::MergeFrom( + const CouchbaseRequest& from) { + CHECK_NE(&from, this); + _buf.append(from._buf); + _pipelined_count += from._pipelined_count; +} + +bool CouchbaseOperations::CouchbaseRequest::IsInitialized() const { + return _pipelined_count != 0; +} + +void CouchbaseOperations::CouchbaseRequest::Swap(CouchbaseRequest* other) { + if (other != this) { + _buf.swap(other->_buf); + std::swap(_pipelined_count, other->_pipelined_count); + std::swap(_cached_size_, other->_cached_size_); + } +} + +void CouchbaseOperations::CouchbaseResponse::sharedCtor() { _cached_size_ = 0; } + +void CouchbaseOperations::CouchbaseResponse::sharedDtor() {} + +void CouchbaseOperations::CouchbaseResponse::setCachedSize(int size) const { + _cached_size_ = size; +} + +void CouchbaseOperations::CouchbaseResponse::Clear() {} + + +void CouchbaseOperations::CouchbaseResponse::MergeFrom( + const CouchbaseResponse& from) { + CHECK_NE(&from, this); + _err = from._err; + _buf.append(from._buf); +} + +bool CouchbaseOperations::CouchbaseResponse::IsInitialized() const { + return !_buf.empty(); +} + +void CouchbaseOperations::CouchbaseResponse::swap(CouchbaseResponse* other) { + if (other != this) { + _buf.swap(other->_buf); + std::swap(_cached_size_, other->_cached_size_); + } +} + +// =================================================================== + +const char* CouchbaseOperations::CouchbaseResponse::statusStr(Status st) { + switch (st) { + case STATUS_SUCCESS: + return "SUCCESS"; + case STATUS_KEY_ENOENT: + return "Key not found"; + case STATUS_KEY_EEXISTS: + return "Key already exists"; + case STATUS_E2BIG: + return "Value too large"; + case STATUS_EINVAL: + return "Invalid arguments"; + case STATUS_NOT_STORED: + return "Item not stored"; + case STATUS_DELTA_BADVAL: + return "Invalid delta value for increment/decrement"; + case STATUS_VBUCKET_BELONGS_TO_ANOTHER_SERVER: + return "VBucket belongs to another server"; + case STATUS_AUTH_ERROR: + return "Authentication failed"; + case STATUS_AUTH_CONTINUE: + return "Authentication continue"; + case STATUS_ERANGE: + return "Range error"; + case STATUS_ROLLBACK: + return "Rollback required"; + case STATUS_EACCESS: + return "Access denied"; + case STATUS_NOT_INITIALIZED: + return "Not initialized"; + case STATUS_UNKNOWN_COMMAND: + return "Unknown command"; + case STATUS_ENOMEM: + return "Out of memory"; + case STATUS_NOT_SUPPORTED: + return "Operation not supported"; + case STATUS_EINTERNAL: + return "Internal server error"; + case STATUS_EBUSY: + return "Server busy"; + case STATUS_ETMPFAIL: + return "Temporary failure"; + case STATUS_UNKNOWN_COLLECTION: + return "Unknown collection"; + case STATUS_NO_COLLECTIONS_MANIFEST: + return "No collections manifest"; + case STATUS_CANNOT_APPLY_COLLECTIONS_MANIFEST: + return "Cannot apply collections manifest"; + case STATUS_COLLECTIONS_MANIFEST_IS_AHEAD: + return "Collections manifest is ahead"; + case STATUS_UNKNOWN_SCOPE: + return "Unknown scope"; + case STATUS_DCP_STREAM_ID_INVALID: + return "Invalid DCP stream ID"; + case STATUS_DURABILITY_INVALID_LEVEL: + return "Invalid durability level"; + case STATUS_DURABILITY_IMPOSSIBLE: + return "Durability requirements impossible"; + case STATUS_SYNC_WRITE_IN_PROGRESS: + return "Synchronous write in progress"; + case STATUS_SYNC_WRITE_AMBIGUOUS: + return "Synchronous write result ambiguous"; + case STATUS_SYNC_WRITE_RE_COMMIT_IN_PROGRESS: + return "Synchronous write re-commit in progress"; + case STATUS_SUBDOC_PATH_NOT_FOUND: + return "Sub-document path not found"; + case STATUS_SUBDOC_PATH_MISMATCH: + return "Sub-document path mismatch"; + case STATUS_SUBDOC_PATH_EINVAL: + return "Invalid sub-document path"; + case STATUS_SUBDOC_PATH_E2BIG: + return "Sub-document path too deep"; + case STATUS_SUBDOC_DOC_E2DEEP: + return "Sub-document too deep"; + case STATUS_SUBDOC_VALUE_CANTINSERT: + return "Cannot insert sub-document value"; + case STATUS_SUBDOC_DOC_NOT_JSON: + return "Document is not JSON"; + case STATUS_SUBDOC_NUM_E2BIG: + return "Sub-document number too large"; + case STATUS_SUBDOC_DELTA_E2BIG: + return "Sub-document delta too large"; + case STATUS_SUBDOC_PATH_EEXISTS: + return "Sub-document path already exists"; + case STATUS_SUBDOC_VALUE_E2DEEP: + return "Sub-document value too deep"; + case STATUS_SUBDOC_INVALID_COMBO: + return "Invalid sub-document operation combination"; + case STATUS_SUBDOC_MULTI_PATH_FAILURE: + return "Sub-document multi-path operation failed"; + case STATUS_SUBDOC_SUCCESS_DELETED: + return "Sub-document operation succeeded on deleted document"; + case STATUS_SUBDOC_XATTR_INVALID_FLAG_COMBO: + return "Invalid extended attribute flag combination"; + case STATUS_SUBDOC_XATTR_INVALID_KEY_COMBO: + return "Invalid extended attribute key combination"; + case STATUS_SUBDOC_XATTR_UNKNOWN_MACRO: + return "Unknown extended attribute macro"; + case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR: + return "Unknown virtual extended attribute"; + case STATUS_SUBDOC_XATTR_CANT_MODIFY_VATTR: + return "Cannot modify virtual extended attribute"; + case STATUS_SUBDOC_MULTI_PATH_FAILURE_DELETED: + return "Sub-document multi-path operation failed on deleted document"; + case STATUS_SUBDOC_INVALID_XATTR_ORDER: + return "Invalid extended attribute order"; + case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR_MACRO: + return "Unknown virtual extended attribute macro"; + case STATUS_SUBDOC_CAN_ONLY_REVIVE_DELETED_DOCUMENTS: + return "Can only revive deleted documents"; + case STATUS_SUBDOC_DELETED_DOCUMENT_CANT_HAVE_VALUE: + return "Deleted document cannot have a value"; + case STATUS_XATTR_EINVAL: + return "Invalid extended attributes"; + } + return "Unknown status"; +} + +// Helper method to format error messages with status codes +std::string CouchbaseOperations::CouchbaseResponse::formatErrorMessage( + uint16_t status_code, const std::string& operation, + const std::string& error_msg) { + if (error_msg.empty()) { + return butil::string_printf("%s failed with status 0x%02x (%s)", + operation.c_str(), status_code, + statusStr((Status)status_code)); + } else { + return butil::string_printf( + "%s failed with status 0x%02x (%s): %s", operation.c_str(), status_code, + statusStr((Status)status_code), error_msg.c_str()); + } +} + +// MUST NOT have extras. +// MUST have key. +// MUST NOT have value. +bool CouchbaseOperations::CouchbaseRequest::getOrDelete( + uint8_t command, const butil::StringPiece& key, uint8_t coll_id) { + // Collection ID + uint8_t collection_id = coll_id; + uint16_t VBucket_id = hashCrc32(key.data(), key.size()); + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, command, + butil::HostToNet16( + key.size() + + 1), // Key + 0, // extras length + policy::CB_BINARY_RAW_BYTES, // data type + butil::HostToNet16(VBucket_id), + butil::HostToNet32(key.size() + + sizeof(collection_id)), // total body length includes + // key and collection id + 0, 0}; + if (_buf.append(&header, sizeof(header))) { + return false; + } + if (_buf.append(&collection_id, sizeof(collection_id))) { + return false; + } + if (_buf.append(key.data(), key.size())) { + return false; + } + ++_pipelined_count; + return true; +} + +// collectionID fetching either from the metadata cache or if doesn't exist then +// fetch from the server. +bool CouchbaseOperations::CouchbaseRequest::getCachedOrFetchCollectionId( + string collection_name, uint8_t* coll_id, + brpc::CouchbaseManifestManager* metadata_tracking, brpc::Channel* channel, + const string& server, const string& selected_bucket, + unordered_map<string, CouchbaseManifestManager::CollectionManifest>* local_cache) { + if (collection_name.empty()) { + DEBUG_PRINT("Empty collection name"); + return false; + } + if (channel == nullptr) { + DEBUG_PRINT("No channel found, make sure to call Authenticate() first"); + return false; + } + if (server.empty()) { + DEBUG_PRINT("Server is empty, make sure to call Authenticate() first"); + return false; + } + if (selected_bucket.empty()) { + DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first"); + return false; + } + + brpc::CouchbaseManifestManager::CollectionManifest manifest; + // check if the server/bucket exists in the cached collection manifest + if (!metadata_tracking->getBucketToCollectionManifest(server, selected_bucket, + &manifest)) { + DEBUG_PRINT("No cached collection manifest found for bucket " + << selected_bucket << " on server " << server + << ", fetching from server"); + // No cached manifest found, fetch from server + if (!metadata_tracking->refreshCollectionManifest(channel, server, selected_bucket, local_cache)) { + return false; + } + // local cache will also be updated in refreshCollectionManifest + // get the reference to collectionID from local cache + if (!getLocalCachedCollectionId(selected_bucket, "_default", + collection_name, coll_id)) { + // collectionID not found in the latest manifest fetched from server + return false; + } + // collectionID has been found in the latest manifest fetched from server + // and is stored in coll_id + return true; + } else { + // check if collection name to id mapping exists. + if (!metadata_tracking->getManifestToCollectionId( + &manifest, "_default", collection_name, coll_id)) { + // Just to verify that the collectionID does not exist in the manifest + // refresh manifest from server and try again + if (!metadata_tracking->refreshCollectionManifest(channel, server, selected_bucket, local_cache)) { + return false; + } + // local cache will also be updated in refreshCollectionManifest + // get the reference to collectionID from local cache + if (!getLocalCachedCollectionId(selected_bucket, "_default", + collection_name, coll_id)) { + // collectionID not found in the latest manifest fetched from server + return false; + } + // collectionID has been found in the latest manifest fetched from server + // and is stored in coll_id + return true; + } + // update the local cache with the manifest in global cache + (*local_collection_manifest_cache)[selected_bucket] = manifest; + // collectionID found in the cached manifest + return true; + } +} + +bool CouchbaseOperations::CouchbaseRequest::getRequest( + const butil::StringPiece& key, string collection_name, + brpc::Channel* channel, const string& server, const string& bucket) { + DEBUG_PRINT("getRequest called with key: " + << key << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in getRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "getRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in getRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "getRequest"); + return false; + } + } + } + DEBUG_PRINT("getRequest using coll_id: " << (int)coll_id); + return getOrDelete(policy::CB_BINARY_GET, key, coll_id); +} + +bool CouchbaseOperations::CouchbaseRequest::deleteRequest( + const butil::StringPiece& key, string collection_name, + brpc::Channel* channel, const string& server, const string& bucket) { + DEBUG_PRINT("deleteRequest called with key: " + << key << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in deleteRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "deleteRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in deleteRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "deleteRequest"); + return false; + } + } + } + DEBUG_PRINT("deleteRequest using coll_id: " << (int)coll_id); + return getOrDelete(policy::CB_BINARY_DELETE, key, coll_id); +} + +struct FlushHeaderWithExtras { + policy::CouchbaseRequestHeader header; + uint32_t exptime; +} __attribute__((packed)); +BAIDU_CASSERT(sizeof(FlushHeaderWithExtras) == 28, must_match); + +// MAY have extras. +// MUST NOT have key. +// MUST NOT have value. +// Extra data for flush: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Expiration | +// +---------------+---------------+---------------+---------------+ +// Total 4 bytes +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseRequest::FlushRequest(uint32_t timeout) { +// const uint8_t FLUSH_EXTRAS = (timeout == 0 ? 0 : 4); +// FlushHeaderWithExtras header_with_extras = { +// {policy::CB_MAGIC_REQUEST, policy::CB_BINARY_FLUSH, 0, FLUSH_EXTRAS, +// policy::CB_BINARY_RAW_BYTES, 0, butil::HostToNet32(FLUSH_EXTRAS), 0, +// 0}, +// butil::HostToNet32(timeout)}; +// if (FLUSH_EXTRAS == 0) { +// if (_buf.append(&header_with_extras.header, +// sizeof(policy::CouchbaseRequestHeader))) { +// return false; +// } +// } else { +// if (_buf.append(&header_with_extras, sizeof(header_with_extras))) { +// return false; +// } +// } +// ++_pipelined_count; +// return true; +// } + +// (if found): +// MUST have extras. +// MAY have key. +// MAY have value. +// Extra data for the get commands: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Flags | +// +---------------+---------------+---------------+---------------+ +// Total 4 bytes +bool CouchbaseOperations::CouchbaseResponse::popGet(butil::IOBuf* value, + uint32_t* flags, + uint64_t* cas_value) { + const size_t n = _buf.size(); + policy::CouchbaseResponseHeader header; + if (n < sizeof(header)) { + butil::string_printf(&_err, "buffer is too small to contain a header"); + return false; + } + _buf.copy_to(&header, sizeof(header)); + if (header.command != (uint8_t)policy::CB_BINARY_GET) { + butil::string_printf(&_err, "not a GET response"); + return false; + } + if (n < sizeof(header) + header.total_body_length) { + butil::string_printf(&_err, "response=%u < header=%u + body=%u", + (unsigned)n, (unsigned)sizeof(header), + header.total_body_length); + return false; + } + if (header.status != (uint16_t)STATUS_SUCCESS) { + if (DBUG && header.extras_length != 0) { + DEBUG_PRINT("GET response must not have flags"); + } + if (DBUG && header.key_length != 0) { + DEBUG_PRINT("GET response must not have key"); + } + const int value_size = (int)header.total_body_length - + (int)header.extras_length - (int)header.key_length; + _status_code = header.status; + if (value_size < 0) { + butil::string_printf(&_err, "value_size=%d is non-negative", value_size); + return false; + } + _buf.pop_front(sizeof(header) + header.extras_length + header.key_length); + if (value_size > 0) { + std::string error_msg; + _buf.cutn(&error_msg, value_size); + _err = formatErrorMessage(header.status, "GET operation", error_msg); + } else { + _err = formatErrorMessage(header.status, "GET operation"); + } + return false; + } + if (header.extras_length != 4u) { + butil::string_printf( + &_err, "GET response must have flags as extras, actual length=%u", + header.extras_length); + return false; + } + if (header.key_length != 0) { + butil::string_printf(&_err, "GET response must not have key"); + return false; + } + const int value_size = (int)header.total_body_length - + (int)header.extras_length - (int)header.key_length; + if (value_size < 0) { + butil::string_printf(&_err, "value_size=%d is non-negative", value_size); + return false; + } + _buf.pop_front(sizeof(header)); + uint32_t raw_flags = 0; + _buf.cutn(&raw_flags, sizeof(raw_flags)); + if (flags) { + *flags = butil::NetToHost32(raw_flags); + } + if (value) { + value->clear(); + _buf.cutn(value, value_size); + } + if (cas_value) { + *cas_value = header.cas_value; + } + _err.clear(); + return true; +} + +bool CouchbaseOperations::CouchbaseResponse::popGet(std::string* value, + uint32_t* flags, + uint64_t* cas_value) { + butil::IOBuf tmp; + if (popGet(&tmp, flags, cas_value)) { + tmp.copy_to(value); + return true; + } + return false; +} + +// MUST NOT have extras +// MUST NOT have key +// MUST NOT have value +bool CouchbaseOperations::CouchbaseResponse::popDelete() { + return popStore(policy::CB_BINARY_DELETE, NULL); +} +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseResponse::PopFlush() { +// return popStore(policy::CB_BINARY_FLUSH, NULL); +// } + +struct StoreHeaderWithExtras { + policy::CouchbaseRequestHeader header; + uint32_t flags; + uint32_t exptime; +} __attribute__((packed)); +BAIDU_CASSERT(sizeof(StoreHeaderWithExtras) == 32, must_match); +const size_t STORE_EXTRAS = + sizeof(StoreHeaderWithExtras) - sizeof(policy::CouchbaseRequestHeader); +// MUST have extras. +// MUST have key. +// MAY have value. +// Extra data for set/add/replace: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Flags | +// +---------------+---------------+---------------+---------------+ +// 4| Expiration | +// +---------------+---------------+---------------+---------------+ +// Total 8 bytes +bool CouchbaseOperations::CouchbaseRequest::store( + uint8_t command, const butil::StringPiece& key, + const butil::StringPiece& value, uint32_t flags, uint32_t exptime, + uint64_t cas_value, uint8_t coll_id) { + // add collection id + // uint16_t collection_id = 0x00; + uint8_t collection_id = coll_id; + uint16_t vBucket_id = hashCrc32(key.data(), key.size()); + StoreHeaderWithExtras header_with_extras = { + {policy::CB_MAGIC_REQUEST, command, + butil::HostToNet16(key.size() + + 1), // collection id is not included in part of key, + // so not including it in key length. + STORE_EXTRAS, policy::CB_JSON, butil::HostToNet16(vBucket_id), + butil::HostToNet32(STORE_EXTRAS + sizeof(collection_id) + key.size() + + value.size()), // total body length + 0, butil::HostToNet64(cas_value)}, + butil::HostToNet32(flags), + butil::HostToNet32(exptime)}; + if (_buf.append(&header_with_extras, sizeof(header_with_extras))) { + return false; + } + if (_buf.append(&collection_id, sizeof(collection_id))) { + return false; + } + if (_buf.append(key.data(), key.size())) { + return false; + } + if (_buf.append(value.data(), value.size())) { + return false; + } + ++_pipelined_count; + return true; +} + +// MUST have CAS +// MUST NOT have extras +// MUST NOT have key +// MUST NOT have value +bool CouchbaseOperations::CouchbaseResponse::popStore(uint8_t command, + uint64_t* cas_value) { + const size_t n = _buf.size(); + policy::CouchbaseResponseHeader header; + if (n < sizeof(header)) { + butil::string_printf(&_err, "buffer is too small to contain a header"); + return false; + } + _buf.copy_to(&header, sizeof(header)); + if (header.command != command) { + butil::string_printf(&_err, "Not a STORE response"); + return false; + } + if (n < sizeof(header) + header.total_body_length) { + butil::string_printf(&_err, "Not enough data"); + return false; + } + if (DBUG && header.extras_length != 0) { + DEBUG_PRINT("STORE response must not have flags"); + } + if (DBUG && header.key_length != 0) { + DEBUG_PRINT("STORE response must not have key"); + } + int value_size = (int)header.total_body_length - (int)header.extras_length - + (int)header.key_length; + if (header.status != (uint16_t)STATUS_SUCCESS) { + _buf.pop_front(sizeof(header) + header.extras_length + header.key_length); + _status_code = header.status; + if (value_size > 0) { + std::string error_msg; + _buf.cutn(&error_msg, value_size); + _err = formatErrorMessage( + header.status, couchbaseBinaryCommandToString(command), error_msg); + } else { + _err = formatErrorMessage(header.status, + couchbaseBinaryCommandToString(command)); + } + return false; + } + if (DBUG && value_size != 0) { + DEBUG_PRINT("STORE response must not have value, actually=" << value_size); + } + _buf.pop_front(sizeof(header) + header.total_body_length); + if (cas_value) { + *cas_value = header.cas_value; + } + _err.clear(); + return true; +} + +const char* +CouchbaseOperations::CouchbaseResponse::couchbaseBinaryCommandToString( + uint8_t cmd) { + switch (cmd) { + case 0x1f: + return "CB_HELLO_SELECT_FEATURES"; + case 0x89: + return "CB_SELECT_BUCKET"; + case 0xBC: + return "CB_GET_SCOPE_ID"; + case 0x00: + return "CB_BINARY_GET"; + case 0x01: + return "CB_BINARY_SET"; + case 0x02: + return "CB_BINARY_ADD"; + case 0x03: + return "CB_BINARY_REPLACE"; + case 0x04: + return "CB_BINARY_DELETE"; + case 0x05: + return "CB_BINARY_INCREMENT"; + case 0x06: + return "CB_BINARY_DECREMENT"; + case 0x07: + return "CB_BINARY_QUIT"; + case 0x08: + return "CB_BINARY_FLUSH"; + case 0x09: + return "CB_BINARY_GETQ"; + case 0x0a: + return "CB_BINARY_NOOP"; + case 0x0b: + return "CB_BINARY_VERSION"; + case 0x0c: + return "CB_BINARY_GETK"; + case 0x0d: + return "CB_BINARY_GETKQ"; + case 0x0e: + return "CB_BINARY_APPEND"; + case 0x0f: + return "CB_BINARY_PREPEND"; + case 0x10: + return "CB_BINARY_STAT"; + case 0x11: + return "CB_BINARY_SETQ"; + case 0x12: + return "CB_BINARY_ADDQ"; + case 0x13: + return "CB_BINARY_REPLACEQ"; + case 0x14: + return "CB_BINARY_DELETEQ"; + case 0x15: + return "CB_BINARY_INCREMENTQ"; + case 0x16: + return "CB_BINARY_DECREMENTQ"; + case 0x17: + return "CB_BINARY_QUITQ"; + case 0x18: + return "CB_BINARY_FLUSHQ"; + case 0x19: + return "CB_BINARY_APPENDQ"; + case 0x1a: + return "CB_BINARY_PREPENDQ"; + case 0x1c: + return "CB_BINARY_TOUCH"; + case 0x1d: + return "CB_BINARY_GAT"; + case 0x1e: + return "CB_BINARY_GATQ"; + case 0x23: + return "CB_BINARY_GATK"; + case 0x24: + return "CB_BINARY_GATKQ"; + case 0x20: + return "CB_BINARY_SASL_LIST_MECHS"; + case 0x21: + return "CB_BINARY_SASL_AUTH"; + case 0x22: + return "CB_BINARY_SASL_STEP"; + case 0xb5: + return "CB_GET_CLUSTER_CONFIG"; + case 0xba: + return "CB_GET_COLLECTIONS_MANIFEST"; + case 0xbb: + return "CB_COLLECTIONS_GET_CID"; + default: + return "UNKNOWN_COMMAND"; + } +} + +bool CouchbaseOperations::CouchbaseRequest::upsertRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + DEBUG_PRINT("upsertRequest called with key: " + << key << ", value: " << value + << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in upsertRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "upsertRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in upsertRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "upsertRequest"); + return false; + } + } + } + DEBUG_PRINT("upsertRequest using coll_id: " << (int)coll_id); + return store(policy::CB_BINARY_SET, key, value, flags, exptime, cas_value, + coll_id); +} + +// Using GetCollectionManifest instead of fetching collection ID directly +// bool CouchbaseOperations::CouchbaseRequest::GetCollectionId( +// const butil::StringPiece& scope_name, +// const butil::StringPiece& collection_name) { +// // Format the collection path as "scope.collection" +// std::string collection_path = +// scope_name.as_string() + "." + collection_name.as_string(); + +// const policy::CouchbaseRequestHeader header = { +// policy::CB_MAGIC_REQUEST, +// policy::CB_COLLECTIONS_GET_CID, +// butil::HostToNet16(collection_path.size()), +// 0, // no extras +// policy::CB_BINARY_RAW_BYTES, +// 0, // no vbucket +// butil::HostToNet32(collection_path.size()), +// 0, // opaque +// 0 // no CAS +// }; +// if (_buf.append(&header, sizeof(header))) { +// return false; +// } +// if (_buf.append(collection_path.data(), collection_path.size())) { +// return false; +// } +// ++_pipelined_count; +// return true; +// } + +bool CouchbaseOperations::CouchbaseRequest::getCollectionManifest() { + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_GET_COLLECTIONS_MANIFEST, + 0, // no key + 0, // no extras + policy::CB_BINARY_RAW_BYTES, + 0, // no vbucket + 0, // no body (no key, no extras, no value) + 0, // opaque + 0 // no CAS + }; + if (_buf.append(&header, sizeof(header))) { + return false; + } + ++_pipelined_count; + return true; +} + +bool CouchbaseOperations::CouchbaseRequest::addRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + DEBUG_PRINT("addRequest called with key: " + << key << ", value: " << value + << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in addRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "addRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in addRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "addRequest"); + return false; + } + } + } + DEBUG_PRINT("addRequest using coll_id: " << (int)coll_id); + return store(policy::CB_BINARY_ADD, key, value, flags, exptime, cas_value, + coll_id); +} + +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseRequest::ReplaceRequest(const +// butil::StringPiece& key, +// const butil::StringPiece& value, uint32_t +// flags, uint32_t exptime, uint64_t cas_value, +// string collection_name, +// brpc::Channel* channel, const string& server, +// const string& bucket) { +// uint8_t coll_id = 0; // default collection ID +// if(collection_name != "_default"){ +// if(!getCachedOrFetchCollectionId(collection_name, &coll_id, +// metadata_tracking, channel, server, bucket, local_collection_manifest_cache)){ +// return false; +// } +// } +// return Store(policy::CB_BINARY_REPLACE, key, value, flags, exptime, +// cas_value, +// coll_id); +// } + +bool CouchbaseOperations::CouchbaseRequest::appendRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + if (value.empty()) { + DEBUG_PRINT("value to append must be non-empty"); + return false; + } + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (!local_collection_manifest_cache->empty()) { + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + return false; + } + } + } + return store(policy::CB_BINARY_APPEND, key, value, flags, exptime, cas_value, + coll_id); +} + +bool CouchbaseOperations::CouchbaseRequest::prependRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + if (value.empty()) { + DEBUG_PRINT("value to prepend must be non-empty"); + return false; + } + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (!local_collection_manifest_cache->empty()) { + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + return false; + } + } Review Comment: The logic for checking if the cache is empty is inverted. Line 1525 checks `if (!local_collection_manifest_cache->empty())` with a comment saying "if local cache is empty", but the condition actually checks if the cache is NOT empty. The logic should be `if (local_collection_manifest_cache->empty())` to match the comment and intended behavior. This is the same issue as in the `appendRequest` method. ########## src/brpc/policy/couchbase_protocol.cpp: ########## @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/policy/couchbase_protocol.h" + +#include <gflags/gflags.h> +#include <google/protobuf/descriptor.h> // MethodDescriptor +#include <google/protobuf/message.h> // Message + +#include "brpc/compress.h" // ParseFromCompressedData +#include "brpc/controller.h" // Controller +#include "brpc/couchbase.h" // CouchbaseRequest, CouchbaseResponse +#include "brpc/details/controller_private_accessor.h" +#include "brpc/details/server_private_accessor.h" +#include "brpc/policy/most_common_message.h" +#include "brpc/server.h" // Server +#include "brpc/socket.h" // Socket +#include "brpc/span.h" +#include "butil/containers/flat_map.h" +#include "butil/iobuf.h" // butil::IOBuf +#include "butil/logging.h" // LOG() +#include "butil/sys_byteorder.h" +#include "butil/time.h" + +namespace brpc { + +DECLARE_bool(enable_rpcz); + +namespace policy { + +BAIDU_CASSERT(sizeof(CouchbaseRequestHeader) == 24, must_match); +BAIDU_CASSERT(sizeof(CouchbaseResponseHeader) == 24, must_match); + +static uint64_t supported_cmd_map[8]; +static pthread_once_t supported_cmd_map_once = PTHREAD_ONCE_INIT; + +static void InitSupportedCommandMap() { + butil::bit_array_clear(supported_cmd_map, 256); + butil::bit_array_set(supported_cmd_map, CB_BINARY_GET); + butil::bit_array_set(supported_cmd_map, CB_HELLO_SELECT_FEATURES); + butil::bit_array_set(supported_cmd_map, CB_SELECT_BUCKET); + butil::bit_array_set(supported_cmd_map, CB_GET_SCOPE_ID); + butil::bit_array_set(supported_cmd_map, CB_BINARY_SET); + butil::bit_array_set(supported_cmd_map, CB_BINARY_ADD); + butil::bit_array_set(supported_cmd_map, CB_BINARY_REPLACE); + butil::bit_array_set(supported_cmd_map, CB_BINARY_DELETE); + butil::bit_array_set(supported_cmd_map, CB_BINARY_INCREMENT); + butil::bit_array_set(supported_cmd_map, CB_BINARY_DECREMENT); + butil::bit_array_set(supported_cmd_map, CB_BINARY_FLUSH); + butil::bit_array_set(supported_cmd_map, CB_BINARY_VERSION); + butil::bit_array_set(supported_cmd_map, CB_BINARY_NOOP); + butil::bit_array_set(supported_cmd_map, CB_BINARY_APPEND); + butil::bit_array_set(supported_cmd_map, CB_BINARY_PREPEND); + butil::bit_array_set(supported_cmd_map, CB_BINARY_STAT); + butil::bit_array_set(supported_cmd_map, CB_BINARY_TOUCH); + butil::bit_array_set(supported_cmd_map, CB_BINARY_SASL_AUTH); + // Collection management commands + butil::bit_array_set(supported_cmd_map, CB_GET_COLLECTIONS_MANIFEST); + butil::bit_array_set(supported_cmd_map, CB_COLLECTIONS_GET_CID); + butil::bit_array_set(supported_cmd_map, CB_COLLECTIONS_GET_SCOPE_ID); +} + +inline bool IsSupportedCommand(uint8_t command) { + pthread_once(&supported_cmd_map_once, InitSupportedCommandMap); + return butil::bit_array_get(supported_cmd_map, command); +} + +ParseResult ParseCouchbaseMessage(butil::IOBuf* source, Socket* socket, + bool /*read_eof*/, const void* /*arg*/) { + while (1) { + const uint8_t* p_cbmagic = (const uint8_t*)source->fetch1(); + if (NULL == p_cbmagic) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + if (*p_cbmagic != (uint8_t)CB_MAGIC_RESPONSE) { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + char buf[24]; + const uint8_t* p = (const uint8_t*)source->fetch(buf, sizeof(buf)); + if (NULL == p) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + const CouchbaseResponseHeader* header = (const CouchbaseResponseHeader*)p; + uint32_t total_body_length = butil::NetToHost32(header->total_body_length); + if (source->size() < sizeof(*header) + total_body_length) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + + if (!IsSupportedCommand(header->command)) { + LOG(WARNING) << "Not support command=" << header->command; + source->pop_front(sizeof(*header) + total_body_length); + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + + PipelinedInfo pi; + if (!socket->PopPipelinedInfo(&pi)) { + LOG(WARNING) << "No corresponding PipelinedInfo in socket, drop"; + source->pop_front(sizeof(*header) + total_body_length); + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + MostCommonMessage* msg = + static_cast<MostCommonMessage*>(socket->parsing_context()); + if (msg == NULL) { + msg = MostCommonMessage::Get(); + socket->reset_parsing_context(msg); + } + + // endianness conversions. + const CouchbaseResponseHeader local_header = { + header->magic, + header->command, + butil::NetToHost16(header->key_length), + header->extras_length, + header->data_type, + butil::NetToHost16(header->status), + total_body_length, + butil::NetToHost32(header->opaque), + butil::NetToHost64(header->cas_value), + }; + msg->meta.append(&local_header, sizeof(local_header)); + source->pop_front(sizeof(*header)); + source->cutn(&msg->meta, total_body_length); + if (++msg->pi.count >= pi.count) { + CHECK_EQ(msg->pi.count, pi.count); + msg = + static_cast<MostCommonMessage*>(socket->release_parsing_context()); + msg->pi = pi; + return MakeMessage(msg); + } else { + socket->GivebackPipelinedInfo(pi); + } + } +} + +void ProcessCouchbaseResponse(InputMessageBase* msg_base) { + const int64_t start_parse_us = butil::cpuwide_time_us(); + DestroyingPtr<MostCommonMessage> msg( + static_cast<MostCommonMessage*>(msg_base)); + + const bthread_id_t cid = msg->pi.id_wait; + Controller* cntl = NULL; + const int rc = bthread_id_lock(cid, (void**)&cntl); + if (rc != 0) { + LOG_IF(ERROR, rc != EINVAL && rc != EPERM) + << "Fail to lock correlation_id=" << cid << ": " << berror(rc); + return; + } + + ControllerPrivateAccessor accessor(cntl); + Span* span = accessor.span(); + if (span) { + span->set_base_real_us(msg->base_real_us()); + span->set_received_us(msg->received_us()); + span->set_response_size(msg->meta.length()); + span->set_start_parse_us(start_parse_us); + } + const int saved_error = cntl->ErrorCode(); + if (cntl->response() == NULL) { + cntl->SetFailed(ERESPONSE, "response is NULL!"); + } else if (cntl->response()->GetDescriptor() != + CouchbaseOperations::CouchbaseResponse::descriptor()) { + cntl->SetFailed(ERESPONSE, "Must be CouchbaseResponse"); + } else { + // We work around ParseFrom of pb which is just a placeholder. + ((CouchbaseOperations::CouchbaseResponse*)cntl->response())->rawBuffer() = + msg->meta.movable(); + if (msg->pi.count != accessor.pipelined_count()) { + cntl->SetFailed(ERESPONSE, + "pipelined_count=%d of response does " + "not equal request's=%d", + msg->pi.count, accessor.pipelined_count()); + } + } + // Unlocks correlation_id inside. Revert controller's + // error code if it version check of `cid' fails + msg.reset(); // optional, just release resource ASAP + accessor.OnResponse(cid, saved_error); +} + +void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl, + const google::protobuf::Message* request) { + if (request == NULL) { + return cntl->SetFailed(EREQUEST, "request is NULL"); + } + if (request->GetDescriptor() != + CouchbaseOperations::CouchbaseRequest::descriptor()) { + return cntl->SetFailed(EREQUEST, "Must be CouchbaseRequest"); + } + const CouchbaseOperations::CouchbaseRequest* mr = + (const CouchbaseOperations::CouchbaseRequest*)request; + // We work around SerializeTo of pb which is just a placeholder. + *buf = mr->rawBuffer(); + ControllerPrivateAccessor(cntl).set_pipelined_count(mr->pipelinedCount()); +} + +void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**, + uint64_t /*correlation_id*/, + const google::protobuf::MethodDescriptor*, + Controller* cntl, const butil::IOBuf& request, + const Authenticator* auth) { + if (auth) { + std::cout << "Appending authentication data to request" << std::endl; Review Comment: The debug output uses `std::cout` which is not appropriate for production code. Consider using the logging framework (LOG) or removing this debug output as the `DEBUG_PRINT` macro is already available for debug messages. ```suggestion LOG(INFO) << "Appending authentication data to request"; ``` ########## src/brpc/couchbase.cpp: ########## @@ -0,0 +1,2978 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/couchbase.h" + +#include <zlib.h> //for crc32 Vbucket_id + +// Debug flag for enabling debug statements +static bool DBUG = false; // Set to true to enable debug logs + +// Debug print macro +#define DEBUG_PRINT(msg) \ + do { \ + if (DBUG) { \ + std::cout << "[DEBUG] " << msg << std::endl; \ + } \ + } while (0) + +#include <iostream> + +#include "brpc/policy/couchbase_protocol.h" +#include "brpc/proto_base.pb.h" +#include "butil/logging.h" +#include "butil/macros.h" +#include "butil/string_printf.h" +#include "butil/sys_byteorder.h" +#include "butil/third_party/rapidjson/document.h" +#include "butil/third_party/rapidjson/rapidjson.h" + +namespace brpc { + +// Couchbase protocol constants +namespace { +[[maybe_unused]] constexpr uint32_t APPLE_VBUCKET_COUNT = 64; +constexpr uint32_t DEFAULT_VBUCKET_COUNT = 1024; +constexpr int CONNECTION_ID_SIZE = 33; +constexpr size_t RANDOM_ID_HEX_SIZE = 67; // 33 bytes * 2 + null terminator +} // namespace + +// Static member definitions +CouchbaseManifestManager* + CouchbaseOperations::CouchbaseRequest::metadata_tracking = + &common_metadata_tracking; + +bool brpc::CouchbaseManifestManager::setBucketToCollectionManifest( + string server, string bucket, + CouchbaseManifestManager::CollectionManifest manifest) { + // Then update the collection manifest with proper locking + { + UniqueLock write_lock(rw_bucket_to_collection_manifest_mutex_); + bucket_to_collection_manifest_[server][bucket] = manifest; + } + + return true; +} + +bool brpc::CouchbaseManifestManager::getBucketToCollectionManifest( + string server, string bucket, + CouchbaseManifestManager::CollectionManifest* manifest) { + SharedLock read_lock(rw_bucket_to_collection_manifest_mutex_); + auto it1 = bucket_to_collection_manifest_.find(server); + if (it1 == bucket_to_collection_manifest_.end()) { + return false; + } + auto it2 = it1->second.find(bucket); + if (it2 == it1->second.end()) { + return false; + } + *manifest = it2->second; + return true; +} + +bool brpc::CouchbaseManifestManager::getManifestToCollectionId( + CouchbaseManifestManager::CollectionManifest* manifest, string scope, + string collection, uint8_t* collection_id) { + if (manifest == nullptr || collection_id == nullptr) { + DEBUG_PRINT("Invalid input: manifest or collection_id is null"); + return false; + } + auto it1 = manifest->scope_to_collection_id_map.find(scope); + if (it1 == manifest->scope_to_collection_id_map.end()) { + DEBUG_PRINT("Scope: " << scope << " not found in manifest"); + return false; + } + auto it2 = it1->second.find(collection); + if (it2 == it1->second.end()) { + DEBUG_PRINT("Collection: " << collection + << " not found in scope: " << scope); + return false; + } + *collection_id = it2->second; + return true; +} + +bool CouchbaseManifestManager::jsonToCollectionManifest( + const string& json, + CouchbaseManifestManager::CollectionManifest* manifest) { + if (manifest == nullptr) { + DEBUG_PRINT("Invalid input: manifest is null"); + return false; + } + + // Clear existing data + manifest->uid.clear(); + manifest->scope_to_collection_id_map.clear(); + + if (json.empty()) { + DEBUG_PRINT("JSON string is empty"); + return false; + } + + // Parse JSON using RapidJSON + BUTIL_RAPIDJSON_NAMESPACE::Document document; + document.Parse(json.c_str()); + + if (document.HasParseError()) { + DEBUG_PRINT("Failed to parse JSON: " << document.GetParseError()); + return false; + } + + if (!document.IsObject()) { + DEBUG_PRINT("JSON root is not an object"); + return false; + } + + // Extract uid + if (document.HasMember("uid") && document["uid"].IsString()) { + manifest->uid = document["uid"].GetString(); + } else { + DEBUG_PRINT("Missing or invalid 'uid' field in JSON"); + return false; + } + + // Extract scopes + if (!document.HasMember("scopes") || !document["scopes"].IsArray()) { + DEBUG_PRINT("Missing or invalid 'scopes' field in JSON"); + return false; + } + + const BUTIL_RAPIDJSON_NAMESPACE::Value& scopes = document["scopes"]; + for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < scopes.Size(); ++i) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& scope = scopes[i]; + + if (!scope.IsObject()) { + DEBUG_PRINT("Scope at index " << i << " is not an object"); + return false; + } + + // Extract scope name + if (!scope.HasMember("name") || !scope["name"].IsString()) { + DEBUG_PRINT("Missing or invalid 'name' field in scope at index " << i); + return false; + } + string scope_name = scope["name"].GetString(); + + // Extract collections + if (!scope.HasMember("collections") || !scope["collections"].IsArray()) { + DEBUG_PRINT("Missing or invalid 'collections' field in scope '" + << scope_name << "'"); + return false; + } + + const BUTIL_RAPIDJSON_NAMESPACE::Value& collections = scope["collections"]; + unordered_map<string, uint8_t> collection_map; + + for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < collections.Size(); + ++j) { + const BUTIL_RAPIDJSON_NAMESPACE::Value& collection = collections[j]; + + if (!collection.IsObject()) { + DEBUG_PRINT("Collection at index " << j << " in scope '" << scope_name + << "' is not an object"); + return false; + } + + // Extract collection name + if (!collection.HasMember("name") || !collection["name"].IsString()) { + DEBUG_PRINT("Missing or invalid 'name' field in collection at index " + << j << " in scope '" << scope_name << "'"); + return false; + } + string collection_name = collection["name"].GetString(); + + // Extract collection uid (hex string) + if (!collection.HasMember("uid") || !collection["uid"].IsString()) { + DEBUG_PRINT("Missing or invalid 'uid' field in collection '" + << collection_name << "' in scope '" << scope_name << "'"); + return false; + } + string collection_uid_str = collection["uid"].GetString(); + + // Convert hex string to uint8_t + uint8_t collection_id = 0; + try { + // Convert hex string to integer + unsigned long uid_val = std::stoul(collection_uid_str, nullptr, 16); + if (uid_val > 255) { + DEBUG_PRINT( + "Collection uid '" + << collection_uid_str << "' exceeds uint8_t range in collection '" + << collection_name << "' in scope '" << scope_name << "'"); + return false; + } + collection_id = static_cast<uint8_t>(uid_val); + } catch (const std::exception& e) { + DEBUG_PRINT("Failed to parse collection uid '" + << collection_uid_str << "' as hex in collection '" + << collection_name << "' in scope '" << scope_name << ": " + << e.what()); + return false; + } + + // Add to collection map + collection_map[collection_name] = collection_id; + } + + // Add scope and its collections to manifest + manifest->scope_to_collection_id_map[scope_name] = + std::move(collection_map); + } + + return true; +} + +bool CouchbaseManifestManager::refreshCollectionManifest( + brpc::Channel* channel, const string& server, const string& bucket, + unordered_map<string, CollectionManifest>* local_collection_manifest_cache) { + // first fetch the manifest + // then compare the UID with the cached one + if (channel == nullptr) { + DEBUG_PRINT("No channel found, make sure to call Authenticate() first"); + return false; + } + if (server.empty()) { + DEBUG_PRINT("Server is empty, make sure to call Authenticate() first"); + return false; + } + if (bucket.empty()) { + DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first"); + return false; + } + CouchbaseOperations::CouchbaseRequest temp_get_manifest_request; + CouchbaseOperations::CouchbaseResponse temp_get_manifest_response; + brpc::Controller temp_cntl; + temp_get_manifest_request.getCollectionManifest(); + channel->CallMethod(NULL, &temp_cntl, &temp_get_manifest_request, + &temp_get_manifest_response, NULL); + if (temp_cntl.Failed()) { + DEBUG_PRINT("Failed to get collection manifest: bRPC controller error " + << temp_cntl.ErrorText()); + return false; + } + string manifest_json; + if (!temp_get_manifest_response.popManifest(&manifest_json)) { + DEBUG_PRINT("Failed to parse response for refreshing collection Manifest: " + << temp_get_manifest_response.lastError()); + return false; + } + brpc::CouchbaseManifestManager::CollectionManifest manifest; + if (!common_metadata_tracking.jsonToCollectionManifest(manifest_json, + &manifest)) { + DEBUG_PRINT("Failed to parse collection manifest JSON"); + return false; + } + brpc::CouchbaseManifestManager::CollectionManifest cached_manifest; + if (!common_metadata_tracking.getBucketToCollectionManifest( + server, bucket, &cached_manifest)) { + // No cached manifest found, set the new one + if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket, + manifest)) { + DEBUG_PRINT("Failed to cache collection manifest for bucket " + << bucket << " on server " << server); + return false; + } + DEBUG_PRINT("Cached collection manifest for bucket " + << bucket << " on server " << server); + // also update the local cache + if (local_collection_manifest_cache != nullptr) { + (*local_collection_manifest_cache)[bucket] = manifest; + } + return true; + } + // Compare the UID with the cached one + // If they are different, refresh the cache + else if (manifest.uid != cached_manifest.uid) { + DEBUG_PRINT("Collection manifest has changed for bucket " + << bucket << " on server " << server); + if (!common_metadata_tracking.setBucketToCollectionManifest(server, bucket, + manifest)) { + DEBUG_PRINT("Failed to update cached collection manifest for bucket " + << bucket << " on server " << server); + return false; + } + DEBUG_PRINT("Updated cached collection manifest for bucket " + << bucket << " on server " << server); + // update the local cache as well + if (local_collection_manifest_cache != nullptr) { + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Added to local collection manifest cache for bucket " + << bucket << " on server " << server); + } + return true; + } else { + DEBUG_PRINT("Collection manifest is already up-to-date for bucket " + << bucket << " on server " << server); + if (local_collection_manifest_cache != nullptr) { + if (local_collection_manifest_cache->find(bucket) != + local_collection_manifest_cache->end()) { + // if the bucket already exists in the local cache, check the UID + if ((*local_collection_manifest_cache)[bucket].uid != manifest.uid) { + // if the UID is different, update the local cache + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Updated local collection manifest cache for bucket " + << bucket << " on server " << server); + } + } else { + // if the bucket does not exist in the local cache, add it + (*local_collection_manifest_cache)[bucket] = manifest; + DEBUG_PRINT("Added to local collection manifest cache for bucket " + << bucket << " on server " << server); + } + } + return false; + } +} + +uint32_t CouchbaseOperations::CouchbaseRequest::hashCrc32(const char* key, + size_t key_length) { + static const uint32_t crc32tab[256] = { + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, + 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, + 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, + 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, + 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, + 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, + 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, + 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, + 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, + 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, + 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, + 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, + 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, + 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, + 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, + 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, + 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, + 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, + 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, + 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, + 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, + 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, + 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, + 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, + 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, + 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, + 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, + 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, + 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, + 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, + 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, + 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, + 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, + 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, + 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, + 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, + 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, + 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, + 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d, + }; + + uint64_t x; + uint32_t crc = UINT32_MAX; + + for (x = 0; x < key_length; x++) + crc = (crc >> 8) ^ crc32tab[(crc ^ (uint64_t)key[x]) & 0xff]; + +#ifdef __APPLE__ + return ((~crc) >> 16) % APPLE_VBUCKET_COUNT; +#else + return ((~crc) >> 16) % DEFAULT_VBUCKET_COUNT; +#endif +} + +void CouchbaseOperations::CouchbaseRequest::sharedCtor() { + _pipelined_count = 0; + _cached_size_ = 0; +} + +void CouchbaseOperations::CouchbaseRequest::sharedDtor() {} + +void CouchbaseOperations::CouchbaseRequest::setCachedSize(int size) const { + _cached_size_ = size; +} + +void CouchbaseOperations::CouchbaseRequest::Clear() { + _buf.clear(); + _pipelined_count = 0; +} + +// Support for scope level collections will be added in future. +// Get the Scope ID for a given scope name +// bool CouchbaseOperations::CouchbaseRequest::GetScopeId(const +// butil::StringPiece& scope_name) { +// if (scope_name.empty()) { +// DEBUG_PRINT("Empty scope name"); +// return false; +// } +// // Opcode 0xBC for Get Scope ID (see Collections.md) +// const policy::CouchbaseRequestHeader header = { +// policy::CB_MAGIC_REQUEST, +// policy::CB_GET_SCOPE_ID, +// butil::HostToNet16(scope_name.size()), +// 0, // no extras +// policy::CB_BINARY_RAW_BYTES, +// 0, // no vbucket +// butil::HostToNet32(scope_name.size()), +// 0, // opaque +// 0 // no CAS +// }; +// if (_buf.append(&header, sizeof(header))) { +// return false; +// } +// if (_buf.append(scope_name.data(), scope_name.size())) { +// return false; +// } +// ++_pipelined_count; +// return true; +// } + +bool CouchbaseOperations::CouchbaseRequest::selectBucketRequest( + const butil::StringPiece& bucket_name) { + if (bucket_name.empty()) { + DEBUG_PRINT("Empty bucket name"); + return false; + } + // construct the request header + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_SELECT_BUCKET, + butil::HostToNet16(bucket_name.size()), + 0, + policy::CB_BINARY_RAW_BYTES, + 0, + butil::HostToNet32(bucket_name.size()), + 0, + 0}; + if (_buf.append(&header, sizeof(header))) { + DEBUG_PRINT("Failed to append header to buffer"); + return false; + } + if (_buf.append(bucket_name.data(), bucket_name.size())) { + DEBUG_PRINT("Failed to append bucket name to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +// HelloRequest sends a Hello request to the Couchbase server, which specifies +// the client features and capabilities. +// This is typically the first request sent after connecting to the server. +// It includes the agent name and a randomly generated connection ID in JSON +// format. +bool CouchbaseOperations::CouchbaseRequest::helloRequest() { + std::string agent = "brpc/1.0.0 ("; +#ifdef __APPLE__ + agent += "Darwin/"; +#elif defined(__linux__) + agent += "Linux/"; +#else + agent += "UnknownOS/"; +#endif +#if defined(__x86_64__) + agent += "x86_64"; +#elif defined(__aarch64__) + agent += "arm64"; +#else + agent += "unknown"; +#endif + agent += ";bssl/0x1010107f)"; + + // Generate a random connection ID as hex string + unsigned char raw_id[CONNECTION_ID_SIZE]; + FILE* urandom = fopen("/dev/urandom", "rb"); + if (!urandom || + fread(raw_id, 1, CONNECTION_ID_SIZE, urandom) != CONNECTION_ID_SIZE) { + if (urandom) fclose(urandom); + DEBUG_PRINT("Failed to generate random connection id"); + return false; + } + fclose(urandom); + char hex_id[RANDOM_ID_HEX_SIZE] = {0}; + for (int i = 0; i < CONNECTION_ID_SIZE; ++i) { + sprintf(hex_id + i * 2, "%02x", raw_id[i]); + } + + // Format key as JSON: {"a":"agent","i":"hex_id"} + std::string key = + std::string("{\"a\":\"") + agent + "\",\"i\":\"" + hex_id + "\"}"; + + const uint16_t key_len = key.size(); + uint16_t features[] = { + butil::HostToNet16(0x0001), // Datatype + butil::HostToNet16(0x0006), // XError + butil::HostToNet16(0x0007), // SelectBucket + butil::HostToNet16(0x000b), // Snappy + butil::HostToNet16(0x0012) // Collections + }; + + const uint32_t value_len = sizeof(features); + const uint32_t total_body_len = key_len + value_len; + + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_HELLO_SELECT_FEATURES, + butil::HostToNet16(key_len), // key length + 0, // extras length + policy::CB_BINARY_RAW_BYTES, // data type + 0, // vbucket id + butil::HostToNet32(total_body_len), // total body length + 0, // opaque + 0 // cas value + }; + + if (_buf.append(&header, sizeof(header))) { + DEBUG_PRINT("Failed to append Hello header to buffer"); + return false; + } + if (_buf.append(key.data(), key_len)) { + DEBUG_PRINT("Failed to append Hello JSON key to buffer"); + return false; + } + if (_buf.append(reinterpret_cast<const char*>(features), value_len)) { + DEBUG_PRINT("Failed to append Hello features to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +bool CouchbaseOperations::CouchbaseRequest::authenticateRequest( + const butil::StringPiece& username, const butil::StringPiece& password) { + if (username.empty() || password.empty()) { + DEBUG_PRINT("Empty username or password"); + return false; + } + // insert the features to get enabled, calling function helloRequest() will do + // this. + if (!helloRequest()) { + DEBUG_PRINT("Failed to send helloRequest for authentication"); + return false; + } + // Construct the request header + constexpr char kPlainAuthCommand[] = "PLAIN"; + constexpr char kPadding[1] = {'\0'}; + const brpc::policy::CouchbaseRequestHeader header = { + brpc::policy::CB_MAGIC_REQUEST, + brpc::policy::CB_BINARY_SASL_AUTH, + butil::HostToNet16(sizeof(kPlainAuthCommand) - 1), + 0, + 0, + 0, + butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 + username.length() * 2 + + password.length()), + 0, + 0}; + std::string auth_str; + auth_str.reserve(sizeof(header) + sizeof(kPlainAuthCommand) - 1 + + username.size() * 2 + password.size() + 2); + auth_str.append(reinterpret_cast<const char*>(&header), sizeof(header)); + auth_str.append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1); + auth_str.append(username.data(), username.size()); + auth_str.append(kPadding, sizeof(kPadding)); + auth_str.append(username.data(), username.size()); + auth_str.append(kPadding, sizeof(kPadding)); + auth_str.append(password.data(), password.size()); + if (_buf.append(auth_str.data(), auth_str.size())) { + DEBUG_PRINT("Failed to append auth string to buffer"); + return false; + } + ++_pipelined_count; + return true; +} + +void CouchbaseOperations::CouchbaseRequest::MergeFrom( + const CouchbaseRequest& from) { + CHECK_NE(&from, this); + _buf.append(from._buf); + _pipelined_count += from._pipelined_count; +} + +bool CouchbaseOperations::CouchbaseRequest::IsInitialized() const { + return _pipelined_count != 0; +} + +void CouchbaseOperations::CouchbaseRequest::Swap(CouchbaseRequest* other) { + if (other != this) { + _buf.swap(other->_buf); + std::swap(_pipelined_count, other->_pipelined_count); + std::swap(_cached_size_, other->_cached_size_); + } +} + +void CouchbaseOperations::CouchbaseResponse::sharedCtor() { _cached_size_ = 0; } + +void CouchbaseOperations::CouchbaseResponse::sharedDtor() {} + +void CouchbaseOperations::CouchbaseResponse::setCachedSize(int size) const { + _cached_size_ = size; +} + +void CouchbaseOperations::CouchbaseResponse::Clear() {} + + +void CouchbaseOperations::CouchbaseResponse::MergeFrom( + const CouchbaseResponse& from) { + CHECK_NE(&from, this); + _err = from._err; + _buf.append(from._buf); +} + +bool CouchbaseOperations::CouchbaseResponse::IsInitialized() const { + return !_buf.empty(); +} + +void CouchbaseOperations::CouchbaseResponse::swap(CouchbaseResponse* other) { + if (other != this) { + _buf.swap(other->_buf); + std::swap(_cached_size_, other->_cached_size_); + } +} + +// =================================================================== + +const char* CouchbaseOperations::CouchbaseResponse::statusStr(Status st) { + switch (st) { + case STATUS_SUCCESS: + return "SUCCESS"; + case STATUS_KEY_ENOENT: + return "Key not found"; + case STATUS_KEY_EEXISTS: + return "Key already exists"; + case STATUS_E2BIG: + return "Value too large"; + case STATUS_EINVAL: + return "Invalid arguments"; + case STATUS_NOT_STORED: + return "Item not stored"; + case STATUS_DELTA_BADVAL: + return "Invalid delta value for increment/decrement"; + case STATUS_VBUCKET_BELONGS_TO_ANOTHER_SERVER: + return "VBucket belongs to another server"; + case STATUS_AUTH_ERROR: + return "Authentication failed"; + case STATUS_AUTH_CONTINUE: + return "Authentication continue"; + case STATUS_ERANGE: + return "Range error"; + case STATUS_ROLLBACK: + return "Rollback required"; + case STATUS_EACCESS: + return "Access denied"; + case STATUS_NOT_INITIALIZED: + return "Not initialized"; + case STATUS_UNKNOWN_COMMAND: + return "Unknown command"; + case STATUS_ENOMEM: + return "Out of memory"; + case STATUS_NOT_SUPPORTED: + return "Operation not supported"; + case STATUS_EINTERNAL: + return "Internal server error"; + case STATUS_EBUSY: + return "Server busy"; + case STATUS_ETMPFAIL: + return "Temporary failure"; + case STATUS_UNKNOWN_COLLECTION: + return "Unknown collection"; + case STATUS_NO_COLLECTIONS_MANIFEST: + return "No collections manifest"; + case STATUS_CANNOT_APPLY_COLLECTIONS_MANIFEST: + return "Cannot apply collections manifest"; + case STATUS_COLLECTIONS_MANIFEST_IS_AHEAD: + return "Collections manifest is ahead"; + case STATUS_UNKNOWN_SCOPE: + return "Unknown scope"; + case STATUS_DCP_STREAM_ID_INVALID: + return "Invalid DCP stream ID"; + case STATUS_DURABILITY_INVALID_LEVEL: + return "Invalid durability level"; + case STATUS_DURABILITY_IMPOSSIBLE: + return "Durability requirements impossible"; + case STATUS_SYNC_WRITE_IN_PROGRESS: + return "Synchronous write in progress"; + case STATUS_SYNC_WRITE_AMBIGUOUS: + return "Synchronous write result ambiguous"; + case STATUS_SYNC_WRITE_RE_COMMIT_IN_PROGRESS: + return "Synchronous write re-commit in progress"; + case STATUS_SUBDOC_PATH_NOT_FOUND: + return "Sub-document path not found"; + case STATUS_SUBDOC_PATH_MISMATCH: + return "Sub-document path mismatch"; + case STATUS_SUBDOC_PATH_EINVAL: + return "Invalid sub-document path"; + case STATUS_SUBDOC_PATH_E2BIG: + return "Sub-document path too deep"; + case STATUS_SUBDOC_DOC_E2DEEP: + return "Sub-document too deep"; + case STATUS_SUBDOC_VALUE_CANTINSERT: + return "Cannot insert sub-document value"; + case STATUS_SUBDOC_DOC_NOT_JSON: + return "Document is not JSON"; + case STATUS_SUBDOC_NUM_E2BIG: + return "Sub-document number too large"; + case STATUS_SUBDOC_DELTA_E2BIG: + return "Sub-document delta too large"; + case STATUS_SUBDOC_PATH_EEXISTS: + return "Sub-document path already exists"; + case STATUS_SUBDOC_VALUE_E2DEEP: + return "Sub-document value too deep"; + case STATUS_SUBDOC_INVALID_COMBO: + return "Invalid sub-document operation combination"; + case STATUS_SUBDOC_MULTI_PATH_FAILURE: + return "Sub-document multi-path operation failed"; + case STATUS_SUBDOC_SUCCESS_DELETED: + return "Sub-document operation succeeded on deleted document"; + case STATUS_SUBDOC_XATTR_INVALID_FLAG_COMBO: + return "Invalid extended attribute flag combination"; + case STATUS_SUBDOC_XATTR_INVALID_KEY_COMBO: + return "Invalid extended attribute key combination"; + case STATUS_SUBDOC_XATTR_UNKNOWN_MACRO: + return "Unknown extended attribute macro"; + case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR: + return "Unknown virtual extended attribute"; + case STATUS_SUBDOC_XATTR_CANT_MODIFY_VATTR: + return "Cannot modify virtual extended attribute"; + case STATUS_SUBDOC_MULTI_PATH_FAILURE_DELETED: + return "Sub-document multi-path operation failed on deleted document"; + case STATUS_SUBDOC_INVALID_XATTR_ORDER: + return "Invalid extended attribute order"; + case STATUS_SUBDOC_XATTR_UNKNOWN_VATTR_MACRO: + return "Unknown virtual extended attribute macro"; + case STATUS_SUBDOC_CAN_ONLY_REVIVE_DELETED_DOCUMENTS: + return "Can only revive deleted documents"; + case STATUS_SUBDOC_DELETED_DOCUMENT_CANT_HAVE_VALUE: + return "Deleted document cannot have a value"; + case STATUS_XATTR_EINVAL: + return "Invalid extended attributes"; + } + return "Unknown status"; +} + +// Helper method to format error messages with status codes +std::string CouchbaseOperations::CouchbaseResponse::formatErrorMessage( + uint16_t status_code, const std::string& operation, + const std::string& error_msg) { + if (error_msg.empty()) { + return butil::string_printf("%s failed with status 0x%02x (%s)", + operation.c_str(), status_code, + statusStr((Status)status_code)); + } else { + return butil::string_printf( + "%s failed with status 0x%02x (%s): %s", operation.c_str(), status_code, + statusStr((Status)status_code), error_msg.c_str()); + } +} + +// MUST NOT have extras. +// MUST have key. +// MUST NOT have value. +bool CouchbaseOperations::CouchbaseRequest::getOrDelete( + uint8_t command, const butil::StringPiece& key, uint8_t coll_id) { + // Collection ID + uint8_t collection_id = coll_id; + uint16_t VBucket_id = hashCrc32(key.data(), key.size()); + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, command, + butil::HostToNet16( + key.size() + + 1), // Key + 0, // extras length + policy::CB_BINARY_RAW_BYTES, // data type + butil::HostToNet16(VBucket_id), + butil::HostToNet32(key.size() + + sizeof(collection_id)), // total body length includes + // key and collection id + 0, 0}; + if (_buf.append(&header, sizeof(header))) { + return false; + } + if (_buf.append(&collection_id, sizeof(collection_id))) { + return false; + } + if (_buf.append(key.data(), key.size())) { + return false; + } + ++_pipelined_count; + return true; +} + +// collectionID fetching either from the metadata cache or if doesn't exist then +// fetch from the server. +bool CouchbaseOperations::CouchbaseRequest::getCachedOrFetchCollectionId( + string collection_name, uint8_t* coll_id, + brpc::CouchbaseManifestManager* metadata_tracking, brpc::Channel* channel, + const string& server, const string& selected_bucket, + unordered_map<string, CouchbaseManifestManager::CollectionManifest>* local_cache) { + if (collection_name.empty()) { + DEBUG_PRINT("Empty collection name"); + return false; + } + if (channel == nullptr) { + DEBUG_PRINT("No channel found, make sure to call Authenticate() first"); + return false; + } + if (server.empty()) { + DEBUG_PRINT("Server is empty, make sure to call Authenticate() first"); + return false; + } + if (selected_bucket.empty()) { + DEBUG_PRINT("No bucket selected, make sure to call SelectBucket() first"); + return false; + } + + brpc::CouchbaseManifestManager::CollectionManifest manifest; + // check if the server/bucket exists in the cached collection manifest + if (!metadata_tracking->getBucketToCollectionManifest(server, selected_bucket, + &manifest)) { + DEBUG_PRINT("No cached collection manifest found for bucket " + << selected_bucket << " on server " << server + << ", fetching from server"); + // No cached manifest found, fetch from server + if (!metadata_tracking->refreshCollectionManifest(channel, server, selected_bucket, local_cache)) { + return false; + } + // local cache will also be updated in refreshCollectionManifest + // get the reference to collectionID from local cache + if (!getLocalCachedCollectionId(selected_bucket, "_default", + collection_name, coll_id)) { + // collectionID not found in the latest manifest fetched from server + return false; + } + // collectionID has been found in the latest manifest fetched from server + // and is stored in coll_id + return true; + } else { + // check if collection name to id mapping exists. + if (!metadata_tracking->getManifestToCollectionId( + &manifest, "_default", collection_name, coll_id)) { + // Just to verify that the collectionID does not exist in the manifest + // refresh manifest from server and try again + if (!metadata_tracking->refreshCollectionManifest(channel, server, selected_bucket, local_cache)) { + return false; + } + // local cache will also be updated in refreshCollectionManifest + // get the reference to collectionID from local cache + if (!getLocalCachedCollectionId(selected_bucket, "_default", + collection_name, coll_id)) { + // collectionID not found in the latest manifest fetched from server + return false; + } + // collectionID has been found in the latest manifest fetched from server + // and is stored in coll_id + return true; + } + // update the local cache with the manifest in global cache + (*local_collection_manifest_cache)[selected_bucket] = manifest; + // collectionID found in the cached manifest + return true; + } +} + +bool CouchbaseOperations::CouchbaseRequest::getRequest( + const butil::StringPiece& key, string collection_name, + brpc::Channel* channel, const string& server, const string& bucket) { + DEBUG_PRINT("getRequest called with key: " + << key << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in getRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "getRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in getRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "getRequest"); + return false; + } + } + } + DEBUG_PRINT("getRequest using coll_id: " << (int)coll_id); + return getOrDelete(policy::CB_BINARY_GET, key, coll_id); +} + +bool CouchbaseOperations::CouchbaseRequest::deleteRequest( + const butil::StringPiece& key, string collection_name, + brpc::Channel* channel, const string& server, const string& bucket) { + DEBUG_PRINT("deleteRequest called with key: " + << key << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in deleteRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "deleteRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in deleteRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "deleteRequest"); + return false; + } + } + } + DEBUG_PRINT("deleteRequest using coll_id: " << (int)coll_id); + return getOrDelete(policy::CB_BINARY_DELETE, key, coll_id); +} + +struct FlushHeaderWithExtras { + policy::CouchbaseRequestHeader header; + uint32_t exptime; +} __attribute__((packed)); +BAIDU_CASSERT(sizeof(FlushHeaderWithExtras) == 28, must_match); + +// MAY have extras. +// MUST NOT have key. +// MUST NOT have value. +// Extra data for flush: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Expiration | +// +---------------+---------------+---------------+---------------+ +// Total 4 bytes +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseRequest::FlushRequest(uint32_t timeout) { +// const uint8_t FLUSH_EXTRAS = (timeout == 0 ? 0 : 4); +// FlushHeaderWithExtras header_with_extras = { +// {policy::CB_MAGIC_REQUEST, policy::CB_BINARY_FLUSH, 0, FLUSH_EXTRAS, +// policy::CB_BINARY_RAW_BYTES, 0, butil::HostToNet32(FLUSH_EXTRAS), 0, +// 0}, +// butil::HostToNet32(timeout)}; +// if (FLUSH_EXTRAS == 0) { +// if (_buf.append(&header_with_extras.header, +// sizeof(policy::CouchbaseRequestHeader))) { +// return false; +// } +// } else { +// if (_buf.append(&header_with_extras, sizeof(header_with_extras))) { +// return false; +// } +// } +// ++_pipelined_count; +// return true; +// } + +// (if found): +// MUST have extras. +// MAY have key. +// MAY have value. +// Extra data for the get commands: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Flags | +// +---------------+---------------+---------------+---------------+ +// Total 4 bytes +bool CouchbaseOperations::CouchbaseResponse::popGet(butil::IOBuf* value, + uint32_t* flags, + uint64_t* cas_value) { + const size_t n = _buf.size(); + policy::CouchbaseResponseHeader header; + if (n < sizeof(header)) { + butil::string_printf(&_err, "buffer is too small to contain a header"); + return false; + } + _buf.copy_to(&header, sizeof(header)); + if (header.command != (uint8_t)policy::CB_BINARY_GET) { + butil::string_printf(&_err, "not a GET response"); + return false; + } + if (n < sizeof(header) + header.total_body_length) { + butil::string_printf(&_err, "response=%u < header=%u + body=%u", + (unsigned)n, (unsigned)sizeof(header), + header.total_body_length); + return false; + } + if (header.status != (uint16_t)STATUS_SUCCESS) { + if (DBUG && header.extras_length != 0) { + DEBUG_PRINT("GET response must not have flags"); + } + if (DBUG && header.key_length != 0) { + DEBUG_PRINT("GET response must not have key"); + } + const int value_size = (int)header.total_body_length - + (int)header.extras_length - (int)header.key_length; + _status_code = header.status; + if (value_size < 0) { + butil::string_printf(&_err, "value_size=%d is non-negative", value_size); + return false; + } + _buf.pop_front(sizeof(header) + header.extras_length + header.key_length); + if (value_size > 0) { + std::string error_msg; + _buf.cutn(&error_msg, value_size); + _err = formatErrorMessage(header.status, "GET operation", error_msg); + } else { + _err = formatErrorMessage(header.status, "GET operation"); + } + return false; + } + if (header.extras_length != 4u) { + butil::string_printf( + &_err, "GET response must have flags as extras, actual length=%u", + header.extras_length); + return false; + } + if (header.key_length != 0) { + butil::string_printf(&_err, "GET response must not have key"); + return false; + } + const int value_size = (int)header.total_body_length - + (int)header.extras_length - (int)header.key_length; + if (value_size < 0) { + butil::string_printf(&_err, "value_size=%d is non-negative", value_size); + return false; + } + _buf.pop_front(sizeof(header)); + uint32_t raw_flags = 0; + _buf.cutn(&raw_flags, sizeof(raw_flags)); + if (flags) { + *flags = butil::NetToHost32(raw_flags); + } + if (value) { + value->clear(); + _buf.cutn(value, value_size); + } + if (cas_value) { + *cas_value = header.cas_value; + } + _err.clear(); + return true; +} + +bool CouchbaseOperations::CouchbaseResponse::popGet(std::string* value, + uint32_t* flags, + uint64_t* cas_value) { + butil::IOBuf tmp; + if (popGet(&tmp, flags, cas_value)) { + tmp.copy_to(value); + return true; + } + return false; +} + +// MUST NOT have extras +// MUST NOT have key +// MUST NOT have value +bool CouchbaseOperations::CouchbaseResponse::popDelete() { + return popStore(policy::CB_BINARY_DELETE, NULL); +} +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseResponse::PopFlush() { +// return popStore(policy::CB_BINARY_FLUSH, NULL); +// } + +struct StoreHeaderWithExtras { + policy::CouchbaseRequestHeader header; + uint32_t flags; + uint32_t exptime; +} __attribute__((packed)); +BAIDU_CASSERT(sizeof(StoreHeaderWithExtras) == 32, must_match); +const size_t STORE_EXTRAS = + sizeof(StoreHeaderWithExtras) - sizeof(policy::CouchbaseRequestHeader); +// MUST have extras. +// MUST have key. +// MAY have value. +// Extra data for set/add/replace: +// Byte/ 0 | 1 | 2 | 3 | +// / | | | | +// |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +// +---------------+---------------+---------------+---------------+ +// 0| Flags | +// +---------------+---------------+---------------+---------------+ +// 4| Expiration | +// +---------------+---------------+---------------+---------------+ +// Total 8 bytes +bool CouchbaseOperations::CouchbaseRequest::store( + uint8_t command, const butil::StringPiece& key, + const butil::StringPiece& value, uint32_t flags, uint32_t exptime, + uint64_t cas_value, uint8_t coll_id) { + // add collection id + // uint16_t collection_id = 0x00; + uint8_t collection_id = coll_id; + uint16_t vBucket_id = hashCrc32(key.data(), key.size()); + StoreHeaderWithExtras header_with_extras = { + {policy::CB_MAGIC_REQUEST, command, + butil::HostToNet16(key.size() + + 1), // collection id is not included in part of key, + // so not including it in key length. + STORE_EXTRAS, policy::CB_JSON, butil::HostToNet16(vBucket_id), + butil::HostToNet32(STORE_EXTRAS + sizeof(collection_id) + key.size() + + value.size()), // total body length + 0, butil::HostToNet64(cas_value)}, + butil::HostToNet32(flags), + butil::HostToNet32(exptime)}; + if (_buf.append(&header_with_extras, sizeof(header_with_extras))) { + return false; + } + if (_buf.append(&collection_id, sizeof(collection_id))) { + return false; + } + if (_buf.append(key.data(), key.size())) { + return false; + } + if (_buf.append(value.data(), value.size())) { + return false; + } + ++_pipelined_count; + return true; +} + +// MUST have CAS +// MUST NOT have extras +// MUST NOT have key +// MUST NOT have value +bool CouchbaseOperations::CouchbaseResponse::popStore(uint8_t command, + uint64_t* cas_value) { + const size_t n = _buf.size(); + policy::CouchbaseResponseHeader header; + if (n < sizeof(header)) { + butil::string_printf(&_err, "buffer is too small to contain a header"); + return false; + } + _buf.copy_to(&header, sizeof(header)); + if (header.command != command) { + butil::string_printf(&_err, "Not a STORE response"); + return false; + } + if (n < sizeof(header) + header.total_body_length) { + butil::string_printf(&_err, "Not enough data"); + return false; + } + if (DBUG && header.extras_length != 0) { + DEBUG_PRINT("STORE response must not have flags"); + } + if (DBUG && header.key_length != 0) { + DEBUG_PRINT("STORE response must not have key"); + } + int value_size = (int)header.total_body_length - (int)header.extras_length - + (int)header.key_length; + if (header.status != (uint16_t)STATUS_SUCCESS) { + _buf.pop_front(sizeof(header) + header.extras_length + header.key_length); + _status_code = header.status; + if (value_size > 0) { + std::string error_msg; + _buf.cutn(&error_msg, value_size); + _err = formatErrorMessage( + header.status, couchbaseBinaryCommandToString(command), error_msg); + } else { + _err = formatErrorMessage(header.status, + couchbaseBinaryCommandToString(command)); + } + return false; + } + if (DBUG && value_size != 0) { + DEBUG_PRINT("STORE response must not have value, actually=" << value_size); + } + _buf.pop_front(sizeof(header) + header.total_body_length); + if (cas_value) { + *cas_value = header.cas_value; + } + _err.clear(); + return true; +} + +const char* +CouchbaseOperations::CouchbaseResponse::couchbaseBinaryCommandToString( + uint8_t cmd) { + switch (cmd) { + case 0x1f: + return "CB_HELLO_SELECT_FEATURES"; + case 0x89: + return "CB_SELECT_BUCKET"; + case 0xBC: + return "CB_GET_SCOPE_ID"; + case 0x00: + return "CB_BINARY_GET"; + case 0x01: + return "CB_BINARY_SET"; + case 0x02: + return "CB_BINARY_ADD"; + case 0x03: + return "CB_BINARY_REPLACE"; + case 0x04: + return "CB_BINARY_DELETE"; + case 0x05: + return "CB_BINARY_INCREMENT"; + case 0x06: + return "CB_BINARY_DECREMENT"; + case 0x07: + return "CB_BINARY_QUIT"; + case 0x08: + return "CB_BINARY_FLUSH"; + case 0x09: + return "CB_BINARY_GETQ"; + case 0x0a: + return "CB_BINARY_NOOP"; + case 0x0b: + return "CB_BINARY_VERSION"; + case 0x0c: + return "CB_BINARY_GETK"; + case 0x0d: + return "CB_BINARY_GETKQ"; + case 0x0e: + return "CB_BINARY_APPEND"; + case 0x0f: + return "CB_BINARY_PREPEND"; + case 0x10: + return "CB_BINARY_STAT"; + case 0x11: + return "CB_BINARY_SETQ"; + case 0x12: + return "CB_BINARY_ADDQ"; + case 0x13: + return "CB_BINARY_REPLACEQ"; + case 0x14: + return "CB_BINARY_DELETEQ"; + case 0x15: + return "CB_BINARY_INCREMENTQ"; + case 0x16: + return "CB_BINARY_DECREMENTQ"; + case 0x17: + return "CB_BINARY_QUITQ"; + case 0x18: + return "CB_BINARY_FLUSHQ"; + case 0x19: + return "CB_BINARY_APPENDQ"; + case 0x1a: + return "CB_BINARY_PREPENDQ"; + case 0x1c: + return "CB_BINARY_TOUCH"; + case 0x1d: + return "CB_BINARY_GAT"; + case 0x1e: + return "CB_BINARY_GATQ"; + case 0x23: + return "CB_BINARY_GATK"; + case 0x24: + return "CB_BINARY_GATKQ"; + case 0x20: + return "CB_BINARY_SASL_LIST_MECHS"; + case 0x21: + return "CB_BINARY_SASL_AUTH"; + case 0x22: + return "CB_BINARY_SASL_STEP"; + case 0xb5: + return "CB_GET_CLUSTER_CONFIG"; + case 0xba: + return "CB_GET_COLLECTIONS_MANIFEST"; + case 0xbb: + return "CB_COLLECTIONS_GET_CID"; + default: + return "UNKNOWN_COMMAND"; + } +} + +bool CouchbaseOperations::CouchbaseRequest::upsertRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + DEBUG_PRINT("upsertRequest called with key: " + << key << ", value: " << value + << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in upsertRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "upsertRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in upsertRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "upsertRequest"); + return false; + } + } + } + DEBUG_PRINT("upsertRequest using coll_id: " << (int)coll_id); + return store(policy::CB_BINARY_SET, key, value, flags, exptime, cas_value, + coll_id); +} + +// Using GetCollectionManifest instead of fetching collection ID directly +// bool CouchbaseOperations::CouchbaseRequest::GetCollectionId( +// const butil::StringPiece& scope_name, +// const butil::StringPiece& collection_name) { +// // Format the collection path as "scope.collection" +// std::string collection_path = +// scope_name.as_string() + "." + collection_name.as_string(); + +// const policy::CouchbaseRequestHeader header = { +// policy::CB_MAGIC_REQUEST, +// policy::CB_COLLECTIONS_GET_CID, +// butil::HostToNet16(collection_path.size()), +// 0, // no extras +// policy::CB_BINARY_RAW_BYTES, +// 0, // no vbucket +// butil::HostToNet32(collection_path.size()), +// 0, // opaque +// 0 // no CAS +// }; +// if (_buf.append(&header, sizeof(header))) { +// return false; +// } +// if (_buf.append(collection_path.data(), collection_path.size())) { +// return false; +// } +// ++_pipelined_count; +// return true; +// } + +bool CouchbaseOperations::CouchbaseRequest::getCollectionManifest() { + const policy::CouchbaseRequestHeader header = { + policy::CB_MAGIC_REQUEST, + policy::CB_GET_COLLECTIONS_MANIFEST, + 0, // no key + 0, // no extras + policy::CB_BINARY_RAW_BYTES, + 0, // no vbucket + 0, // no body (no key, no extras, no value) + 0, // opaque + 0 // no CAS + }; + if (_buf.append(&header, sizeof(header))) { + return false; + } + ++_pipelined_count; + return true; +} + +bool CouchbaseOperations::CouchbaseRequest::addRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + DEBUG_PRINT("addRequest called with key: " + << key << ", value: " << value + << ", collection_name: " << collection_name + << ", server: " << server << ", bucket: " << bucket); + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (local_collection_manifest_cache->empty()) { + DEBUG_PRINT("Local collection manifest cache is empty in addRequest"); + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "addRequest"); + return false; + } + } + // check if the collection id is available in the local cache + else if (!getLocalCachedCollectionId(bucket, "_default", collection_name, + &coll_id)) { + DEBUG_PRINT("Collection id not found in local cache in addRequest"); + // if not check in the global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + DEBUG_PRINT( + "Failed to get collection id from global cache or server in " + "addRequest"); + return false; + } + } + } + DEBUG_PRINT("addRequest using coll_id: " << (int)coll_id); + return store(policy::CB_BINARY_ADD, key, value, flags, exptime, cas_value, + coll_id); +} + +// Warning: Not tested +// bool CouchbaseOperations::CouchbaseRequest::ReplaceRequest(const +// butil::StringPiece& key, +// const butil::StringPiece& value, uint32_t +// flags, uint32_t exptime, uint64_t cas_value, +// string collection_name, +// brpc::Channel* channel, const string& server, +// const string& bucket) { +// uint8_t coll_id = 0; // default collection ID +// if(collection_name != "_default"){ +// if(!getCachedOrFetchCollectionId(collection_name, &coll_id, +// metadata_tracking, channel, server, bucket, local_collection_manifest_cache)){ +// return false; +// } +// } +// return Store(policy::CB_BINARY_REPLACE, key, value, flags, exptime, +// cas_value, +// coll_id); +// } + +bool CouchbaseOperations::CouchbaseRequest::appendRequest( + const butil::StringPiece& key, const butil::StringPiece& value, + uint32_t flags, uint32_t exptime, uint64_t cas_value, + string collection_name, brpc::Channel* channel, const string& server, + const string& bucket) { + if (value.empty()) { + DEBUG_PRINT("value to append must be non-empty"); + return false; + } + uint8_t coll_id = 0; // default collection ID + if (collection_name != "_default") { + // check if the local cache is empty or not. + if (!local_collection_manifest_cache->empty()) { + // if local cache is empty, goto global cache or fetch from server + if (!getCachedOrFetchCollectionId(collection_name, &coll_id, + metadata_tracking, channel, server, + bucket, local_collection_manifest_cache)) { + return false; + } + } Review Comment: The logic for checking if the cache is empty is inverted. Line 1490 checks `if (!local_collection_manifest_cache->empty())` with a comment saying "if local cache is empty", but the condition actually checks if the cache is NOT empty. The logic should be `if (local_collection_manifest_cache->empty())` to match the comment and intended behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
