fgerlits commented on code in PR #1875:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1875#discussion_r1851803746


##########
extensions/couchbase/controllerservices/CouchbaseClusterService.h:
##########
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <mutex>
+
+#include "core/controller/ControllerService.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "couchbase/cluster.hxx"
+#include "core/ProcessContext.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+struct CouchbaseCollection {
+  std::string bucket_name;
+  std::string scope_name;
+  std::string collection_name;
+};
+
+struct CouchbaseUpsertResult {
+  std::string bucket_name;
+  std::uint64_t cas{0};
+  std::uint64_t sequence_number{0};
+  std::uint64_t partition_uuid{0};
+  std::uint16_t partition_id{0};
+};
+
+enum class CouchbaseValueType {
+  Json,
+  Binary,
+  String
+};
+
+enum class CouchbaseErrorType {
+  FATAL,
+  TEMPORARY,
+};
+
+class CouchbaseClient {
+ public:
+  CouchbaseClient(std::string connection_string, std::string username, 
std::string password, const std::shared_ptr<core::logging::Logger>& logger)
+    : connection_string_(std::move(connection_string)), 
username_(std::move(username)), password_(std::move(password)), logger_(logger) 
{
+  }
+
+  ~CouchbaseClient() {
+    close();
+  }
+
+  CouchbaseClient(const CouchbaseClient&) = delete;
+  CouchbaseClient(CouchbaseClient&&) = delete;
+  CouchbaseClient& operator=(CouchbaseClient&&) = delete;
+  CouchbaseClient& operator=(const CouchbaseClient&) = delete;
+
+  nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const 
CouchbaseCollection& collection, CouchbaseValueType document_type, const 
std::string& document_id,
+    const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& 
options);
+  nonstd::expected<void, CouchbaseErrorType> establishConnection();
+  void close();
+
+ private:
+  nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
getCollection(const CouchbaseCollection& collection);
+
+  std::string connection_string_;
+  std::string username_;
+  std::string password_;
+  std::mutex cluster_mutex_;
+  std::optional<::couchbase::cluster> cluster_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+namespace controllers {
+
+class CouchbaseClusterService : public core::controller::ControllerService {
+ public:
+  explicit CouchbaseClusterService(std::string_view name, const 
minifi::utils::Identifier &uuid = {})
+      : ControllerService(name, uuid) {
+  }
+
+  explicit CouchbaseClusterService(std::string_view name, const 
std::shared_ptr<Configure>& /*configuration*/)
+      : ControllerService(name) {
+  }
+
+  EXTENSIONAPI static constexpr const char* Description = "Provides a 
centralized Couchbase connection and bucket passwords management. Bucket 
passwords can be specified via dynamic properties.";
+
+  EXTENSIONAPI static constexpr auto ConnectionString = 
core::PropertyDefinitionBuilder<>::createProperty("Connection String")
+      .withDescription("The hostnames or ip addresses of the bootstraping 
nodes and optional parameters. Syntax) 
couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")

Review Comment:
   typo, probably:
   ```suggestion
         .withDescription("The hostnames or ip addresses of the bootstraping 
nodes and optional parameters. Syntax: 
couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
   ```



##########
thirdparty/bustache/add-append.patch:
##########
@@ -0,0 +1,42 @@
+fmt library 11.0.2 requires the implementation of the append function aside
+from push_back method when formatting the bustache specific container
+
+diff --git a/include/bustache/model.hpp b/include/bustache/model.hpp
+index 575969a..6df2cff 100644
+--- a/include/bustache/model.hpp
++++ b/include/bustache/model.hpp
+@@ -8,7 +8,7 @@
+ #define BUSTACHE_MODEL_HPP_INCLUDED
+ 
+ #include <bustache/format.hpp>
+-#include <version> 
++#include <version>
+ #include <vector>
+ #include <cstring>
+ #include <concepts>
+@@ -330,6 +330,16 @@ namespace bustache::detail
+             buf[count++] = c;
+         }
+ 
++        template <typename U>
++        void append(const U* begin, const U* end)
++        {
++            size_t content_count = end - begin;
++            for (size_t i = 0; i < content_count; ++i) {
++                push_back(begin[i]);
++            }
++            begin += content_count;
++        }

Review Comment:
   This implementation looks weird to me: why do we use a numeric counter 
instead of incrementing the pointer, and why are we assigning to the `begin` 
local variable at the end when it won't be used any more?
   ```suggestion
   +        template <typename U>
   +        void append(const U* const begin, const U* const end)
   +        {
   +            for (const U* it = begin; it != end; ++it) {
   +                push_back(*it);
   +            }
   +        }
   ```
   
   Also, is it useful to make this generic? Since `push_back()` is only defined 
for `char`, maybe we could write `char*` instead of `U*`.



##########
libminifi/include/Exception.h:
##########
@@ -43,11 +43,12 @@ enum ExceptionType {
   REGEX_EXCEPTION,
   REPOSITORY_EXCEPTION,
   PARAMETER_EXCEPTION,
+  CONTROLLER_ENABLE_EXCEPTION,

Review Comment:
   why was this added? I can't see it used anywhere



##########
thirdparty/couchbase/remove-thirdparty.patch:
##########


Review Comment:
   I think the patch would be easier to read if it removed the unnecessary 
lines instead of commenting them out.



##########
extensions/couchbase/controllerservices/CouchbaseClusterService.cpp:
##########
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CouchbaseClusterService.h"
+#include "couchbase/codec/raw_binary_transcoder.hxx"
+#include "couchbase/codec/raw_string_transcoder.hxx"
+#include "couchbase/codec/raw_json_transcoder.hxx"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+namespace {
+
+constexpr std::array<::couchbase::errc::common, 9> temporary_connection_errors 
= {
+  ::couchbase::errc::common::temporary_failure,
+  ::couchbase::errc::common::request_canceled,
+  ::couchbase::errc::common::internal_server_failure,
+  ::couchbase::errc::common::cas_mismatch,
+  ::couchbase::errc::common::ambiguous_timeout,
+  ::couchbase::errc::common::unambiguous_timeout,
+  ::couchbase::errc::common::rate_limited,
+  ::couchbase::errc::common::quota_limited
+};
+
+CouchbaseErrorType getErrorType(const std::error_code& error_code) {
+  for (const auto& temporary_error : temporary_connection_errors) {
+    if (static_cast<int>(temporary_error) == error_code.value()) {
+      return CouchbaseErrorType::TEMPORARY;
+    }
+  }
+  return CouchbaseErrorType::FATAL;
+}
+
+}  // namespace
+
+nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
CouchbaseClient::getCollection(const CouchbaseCollection& collection) {
+  auto connection_result = establishConnection();
+  if (!connection_result) {
+    return nonstd::make_unexpected(connection_result.error());
+  }
+  std::lock_guard<std::mutex> lock(cluster_mutex_);
+  return 
cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
+}
+
+nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::upsert(const CouchbaseCollection& collection,
+    CouchbaseValueType document_type, const std::string& document_id, const 
std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
+  auto collection_result = getCollection(collection);
+  if (!collection_result.has_value()) {
+    return nonstd::make_unexpected(collection_result.error());
+  }
+
+  std::pair<::couchbase::error, ::couchbase::mutation_result> result;
+  if (document_type == CouchbaseValueType::Json) {
+    result = 
collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, 
buffer, options).get();
+  } else if (document_type == CouchbaseValueType::String) {
+    std::string data_str(reinterpret_cast<const char*>(buffer.data()), 
buffer.size());
+    result = 
collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id,
 data_str, options).get();
+  } else {
+    result = 
collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id,
 buffer, options).get();
+  }
+  auto& [upsert_err, upsert_resp] = result;
+  if (upsert_err.ec()) {
+    // ambiguous_timeout should not be retried as we do not know if the insert 
was successful or not
+    if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && 
upsert_err.ec().value() != 
static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
+      logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
+        document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+      return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
+    }
+    logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' with error code: '{}', message: '{}'",
+      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+    return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+  } else {
+    const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->partition_uuid() : 0);
+    const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->sequence_number() : 0);
+    const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_id() : 0);
+    return CouchbaseUpsertResult {
+      collection.bucket_name,
+      upsert_resp.cas().value(),
+      partition_uuid,
+      sequence_number,
+      partition_id
+    };
+  }
+}
+
+void CouchbaseClient::close() {
+  std::lock_guard<std::mutex> lock(cluster_mutex_);
+  if (cluster_) {
+    cluster_->close().wait();

Review Comment:
   should we reset `cluster_` to `nullopt` after closing it?



##########
docker/test/integration/cluster/containers/Container.py:
##########
@@ -34,6 +34,7 @@ def __init__(self, feature_context: FeatureContext, name, 
engine, vols, network,
         # Get docker client
         self.client = docker.from_env()
         self.deployed = False
+        self.post_startup_commands_finished = False

Review Comment:
   I would move this to the constructor of `CouchbaseServerContainer`, since 
the field is only used in that class.



##########
extensions/couchbase/processors/PutCouchbaseKey.h:
##########
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "core/AbstractProcessor.h"
+#include "core/ProcessSession.h"
+#include "utils/Enum.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "CouchbaseClusterService.h"
+#include "couchbase/persist_to.hxx"
+#include "couchbase/replicate_to.hxx"
+
+namespace magic_enum::customize {
+
+template <>
+constexpr customize_t 
enum_name<::couchbase::persist_to>(::couchbase::persist_to value) noexcept {
+  switch (value) {
+    case ::couchbase::persist_to::none:
+      return "NONE";
+    case ::couchbase::persist_to::active:
+      return "ACTIVE";
+    case ::couchbase::persist_to::one:
+      return "ONE";
+    case ::couchbase::persist_to::two:
+      return "TWO";
+    case ::couchbase::persist_to::three:
+      return "THREE";
+    case ::couchbase::persist_to::four:
+      return "FOUR";
+  }
+  return invalid_tag;
+}
+
+template <>
+constexpr customize_t 
enum_name<::couchbase::replicate_to>(::couchbase::replicate_to value) noexcept {
+  switch (value) {
+    case ::couchbase::replicate_to::none:
+      return "NONE";
+    case ::couchbase::replicate_to::one:
+      return "ONE";
+    case ::couchbase::replicate_to::two:
+      return "TWO";
+    case ::couchbase::replicate_to::three:
+      return "THREE";
+  }
+  return invalid_tag;
+}
+}  // namespace magic_enum::customize
+
+namespace org::apache::nifi::minifi::couchbase::processors {
+
+class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> {
+ public:
+  using core::AbstractProcessor<PutCouchbaseKey>::AbstractProcessor;
+
+  EXTENSIONAPI static constexpr const char* Description = "Put a document to 
Couchbase Server via Key/Value access.";
+
+  EXTENSIONAPI static constexpr auto CouchbaseClusterControllerService = 
core::PropertyDefinitionBuilder<>::createProperty("Couchbase Cluster Controller 
Service")
+      .withDescription("A Couchbase Cluster Controller Service which manages 
connections to a Couchbase cluster.")
+      .withAllowedTypes<controllers::CouchbaseClusterService>()
+      .isRequired(true)
+      .build();
+  EXTENSIONAPI static constexpr auto BucketName = 
core::PropertyDefinitionBuilder<>::createProperty("Bucket Name")
+      .withDescription("The name of bucket to access.")
+      .withDefaultValue("default")
+      .isRequired(true)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto ScopeName = 
core::PropertyDefinitionBuilder<>::createProperty("Scope Name")
+      .withDescription("Scope to use inside the bucket. If not specified, the 
_default scope is used.")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto CollectionName = 
core::PropertyDefinitionBuilder<>::createProperty("Collection Name")
+      .withDescription("Collection to use inside the bucket scope. If not 
specified, the _default collection is used.")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto DocumentType = 
core::PropertyDefinitionBuilder<3>::createProperty("Document Type")

Review Comment:
   I think we should use `enum_count` in cases like this, mainly for readability
   ```suggestion
     EXTENSIONAPI static constexpr auto DocumentType = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<CouchbaseValueType>()>::createProperty("Document
 Type")
   ```



##########
LICENSE:
##########
@@ -214,6 +214,7 @@ This product bundles 'Kubernetes Client Library for C' 
(kubernetes-client/c), wh
 This project bundles a configuration file from 'Kubernetes Metrics Server' 
(kubernetes-sigs/metrics-server), which is available under an ALv2 license
 This project bundles 'OpenSSL' which is available under an ALv2 license
 This project bundles 'gRPC' which is available under an ALv2 license
+This project bundles 'couchbase-cxx-clien' which is available under an ALv2 
license

Review Comment:
   typo:
   ```suggestion
   This project bundles 'couchbase-cxx-client' which is available under an ALv2 
license
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to