szaszm commented on code in PR #1875:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1875#discussion_r1840723434


##########
extensions/couchbase/controllerservices/CouchbaseClusterService.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CouchbaseClusterService.h"
+#include "couchbase/codec/raw_binary_transcoder.hxx"
+#include "couchbase/codec/raw_string_transcoder.hxx"
+#include "couchbase/codec/raw_json_transcoder.hxx"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+CouchbaseErrorType CouchbaseClient::getErrorType(const std::error_code& 
error_code) {
+  for (const auto& temporary_error : temporary_connection_errors) {
+    if (static_cast<int>(temporary_error) == error_code.value()) {
+      return CouchbaseErrorType::TEMPORARY;
+    }
+  }
+  return CouchbaseErrorType::FATAL;
+}
+
+nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
CouchbaseClient::getCollection(const CouchbaseCollection& collection) {
+  auto connection_result = establishConnection();
+  if (!connection_result) {
+    return nonstd::make_unexpected(connection_result.error());
+  }
+  return 
cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
+}
+
+nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::upsert(const CouchbaseCollection& collection,
+    CouchbaseValueType document_type, const std::string& document_id, const 
std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
+  auto collection_result = getCollection(collection);
+  if (!collection_result.has_value()) {
+    return nonstd::make_unexpected(collection_result.error());
+  }
+
+  std::pair<::couchbase::error, ::couchbase::mutation_result> result;
+  if (document_type == CouchbaseValueType::Json) {
+    result = 
collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, 
buffer, options).get();
+  } else if (document_type == CouchbaseValueType::String) {
+    std::string data_str(reinterpret_cast<const char*>(buffer.data()), 
buffer.size());
+    result = 
collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id,
 data_str, options).get();
+  } else {
+    result = 
collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id,
 buffer, options).get();
+  }
+  auto& [upsert_err, upsert_resp] = result;
+  if (upsert_err.ec()) {
+    // ambiguous_timeout should not be retried as we do not know if the insert 
was successful or not
+    if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && 
upsert_err.ec().value() != 
static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
+      logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
+        document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+      return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
+    }
+    logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' with error code: '{}', message: '{}'",
+      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+    return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+  } else {
+    const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->partition_uuid() : 0);
+    const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->sequence_number() : 0);
+    const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_id() : 0);
+    return CouchbaseUpsertResult {
+      collection.bucket_name,
+      upsert_resp.cas().value(),
+      partition_uuid,
+      sequence_number,
+      partition_id
+    };
+  }
+}
+
+void CouchbaseClient::close() {
+  if (cluster_) {
+    cluster_->close().wait();
+  }
+}
+

Review Comment:
   I think the connection should be closed in the destructor. And since we're 
writing a destructor, we should also write all 5 special member functions (even 
if just =delete)



##########
extensions/couchbase/controllerservices/CouchbaseClusterService.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CouchbaseClusterService.h"
+#include "couchbase/codec/raw_binary_transcoder.hxx"
+#include "couchbase/codec/raw_string_transcoder.hxx"
+#include "couchbase/codec/raw_json_transcoder.hxx"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+CouchbaseErrorType CouchbaseClient::getErrorType(const std::error_code& 
error_code) {
+  for (const auto& temporary_error : temporary_connection_errors) {
+    if (static_cast<int>(temporary_error) == error_code.value()) {
+      return CouchbaseErrorType::TEMPORARY;
+    }
+  }
+  return CouchbaseErrorType::FATAL;
+}
+
+nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
CouchbaseClient::getCollection(const CouchbaseCollection& collection) {
+  auto connection_result = establishConnection();
+  if (!connection_result) {
+    return nonstd::make_unexpected(connection_result.error());
+  }
+  return 
cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
+}
+
+nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::upsert(const CouchbaseCollection& collection,
+    CouchbaseValueType document_type, const std::string& document_id, const 
std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
+  auto collection_result = getCollection(collection);
+  if (!collection_result.has_value()) {
+    return nonstd::make_unexpected(collection_result.error());
+  }
+
+  std::pair<::couchbase::error, ::couchbase::mutation_result> result;
+  if (document_type == CouchbaseValueType::Json) {
+    result = 
collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, 
buffer, options).get();
+  } else if (document_type == CouchbaseValueType::String) {
+    std::string data_str(reinterpret_cast<const char*>(buffer.data()), 
buffer.size());
+    result = 
collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id,
 data_str, options).get();
+  } else {
+    result = 
collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id,
 buffer, options).get();
+  }
+  auto& [upsert_err, upsert_resp] = result;
+  if (upsert_err.ec()) {
+    // ambiguous_timeout should not be retried as we do not know if the insert 
was successful or not
+    if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && 
upsert_err.ec().value() != 
static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
+      logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
+        document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+      return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
+    }
+    logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' with error code: '{}', message: '{}'",
+      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+    return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+  } else {
+    const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->partition_uuid() : 0);
+    const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->sequence_number() : 0);
+    const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_id() : 0);
+    return CouchbaseUpsertResult {
+      collection.bucket_name,
+      upsert_resp.cas().value(),
+      partition_uuid,
+      sequence_number,
+      partition_id
+    };
+  }
+}
+
+void CouchbaseClient::close() {
+  if (cluster_) {
+    cluster_->close().wait();
+  }
+}
+
+nonstd::expected<void, CouchbaseErrorType> 
CouchbaseClient::establishConnection() {
+  if (cluster_) {
+    return {};
+  }
+
+  auto options = ::couchbase::cluster_options(username_, password_);
+  auto [connect_err, cluster] = 
::couchbase::cluster::connect(connection_string_, options).get();
+  if (connect_err.ec()) {
+    logger_->log_error("Failed to connect to Couchbase cluster with error 
code: '{}' and message: '{}'", connect_err.ec(), connect_err.message());
+    return nonstd::make_unexpected(getErrorType(connect_err.ec()));
+  }
+  cluster_ = std::move(cluster);
+  return {};
+}

Review Comment:
   This doesn't look thread-safe, which may be a problem if a couchbase 
processor is multi-threaded, and two threads are concurrently trying to 
establish a connection, or if two processors are sharing a client controller 
service.



##########
extensions/couchbase/controllerservices/CouchbaseClusterService.h:
##########
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "core/controller/ControllerService.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "couchbase/cluster.hxx"
+#include "core/ProcessContext.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+struct CouchbaseCollection {
+  std::string bucket_name;
+  std::string scope_name;
+  std::string collection_name;
+};
+
+struct CouchbaseUpsertResult {
+  std::string bucket_name;
+  std::uint64_t cas{0};
+  std::uint64_t sequence_number{0};
+  std::uint64_t partition_uuid{0};
+  std::uint16_t partition_id{0};
+};
+
+enum class CouchbaseValueType {
+  Json,
+  Binary,
+  String
+};
+
+enum class CouchbaseErrorType {
+  FATAL,
+  TEMPORARY,
+};
+
+class CouchbaseClient {
+ public:
+  CouchbaseClient(std::string connection_string, std::string username, 
std::string password, const std::shared_ptr<core::logging::Logger>& logger)
+    : connection_string_(std::move(connection_string)), 
username_(std::move(username)), password_(std::move(password)), logger_(logger) 
{
+  }
+
+  nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const 
CouchbaseCollection& collection, CouchbaseValueType document_type, const 
std::string& document_id,
+    const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& 
options);
+  nonstd::expected<void, CouchbaseErrorType> establishConnection();
+  void close();
+
+ private:
+  static constexpr std::array<::couchbase::errc::common, 9> 
temporary_connection_errors = {
+    ::couchbase::errc::common::temporary_failure,
+    ::couchbase::errc::common::request_canceled,
+    ::couchbase::errc::common::internal_server_failure,
+    ::couchbase::errc::common::cas_mismatch,
+    ::couchbase::errc::common::ambiguous_timeout,
+    ::couchbase::errc::common::unambiguous_timeout,
+    ::couchbase::errc::common::rate_limited,
+    ::couchbase::errc::common::quota_limited
+  };

Review Comment:
   I'd move this to the .cpp file, to an anonymous namespace, or inline into 
getErrorType
   
   I would also rename `getErrorType` to `isTemporaryError`, and make it return 
bool. It could be a simple call to `std::any_of`.



##########
Windows.md:
##########
@@ -122,7 +122,7 @@ A basic working CMake configuration can be inferred from 
the `win_build_vs.bat`.
 ```
 mkdir build
 cd build
-cmake -G "Visual Studio 17 2022" -A x64 
-DMINIFI_INCLUDE_VC_REDIST_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF 
-DENABLE_SQL=OFF -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=OFF 
-DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 
-DENABLE_LIBRDKAFKA=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF 
-DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_OPENCV=OFF 
-DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE_SHARED_LIBS=OFF 
-DENABLE_CONTROLLER=ON -DENABLE_BUSTACHE=OFF -DENABLE_ENCRYPT_CONFIG=OFF 
-DENABLE_LUA_SCRIPTING=OFF -DENABLE_MQTT=OFF -DENABLE_OPC=OFF -DENABLE_OPS=OFF 
-DENABLE_PYTHON_SCRIPTING= -DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF 
-DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DMINIFI_FAIL_ON_WARNINGS=OFF 
-DSKIP_TESTS=OFF ..
+cmake -G "Visual Studio 17 2022" -A x64 
-DMINIFI_INCLUDE_VC_REDIST_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF 
-DENABLE_SQL=OFF -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=OFF 
-DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 
-DENABLE_LIBRDKAFKA=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF 
-DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_OPENCV=OFF 
-DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE_SHARED_LIBS=OFF 
-DENABLE_CONTROLLER=ON -DENABLE_BUSTACHE=OFF -DENABLE_ENCRYPT_CONFIG=OFF 
-DENABLE_LUA_SCRIPTING=OFF -DENABLE_MQTT=OFF -DENABLE_OPC=OFF -DENABLE_OPS=OFF 
-DENABLE_PYTHON_SCRIPTING= -DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF 
-DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DENABLE_COUCHBASE=OFF 
-DMINIFI_FAIL_ON_WARNINGS=OFF -DSKIP_TESTS=OFF ..

Review Comment:
   not working on windows?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to