This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch minifi-api-property in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit af72292a3b59d76d103764cf6a1f39fd06374759 Author: Martin Zink <[email protected]> AuthorDate: Thu Mar 13 13:00:02 2025 +0100 orThrow and orTerminate --- extension-utils/src/utils/ProcessorConfigUtils.cpp | 22 +++++++++---------- extensions/aws/processors/PutS3Object.cpp | 4 ++-- extensions/aws/processors/S3Processor.cpp | 2 +- extensions/aws/s3/MultipartUploadStateStorage.cpp | 10 ++++----- extensions/civetweb/processors/ListenHTTP.cpp | 2 +- .../controllerservices/CouchbaseClusterService.cpp | 2 +- extensions/kafka/ConsumeKafka.cpp | 2 +- extensions/mqtt/processors/ConsumeMQTT.cpp | 2 +- extensions/mqtt/processors/PublishMQTT.cpp | 2 +- extensions/rocksdb-repos/ProvenanceRepository.cpp | 2 +- .../controllers/RocksDbStateStorage.cpp | 6 +++--- extensions/smb/SmbConnectionControllerService.cpp | 4 ++-- .../controllers/JsonRecordSetWriter.cpp | 4 ++-- .../standard-processors/modbus/FetchModbusTcp.cpp | 2 +- .../processors/AppendHostInfo.cpp | 4 ++-- .../processors/AttributesToJSON.cpp | 4 ++-- .../processors/DefragmentText.cpp | 2 +- .../standard-processors/processors/ExtractText.cpp | 2 +- .../standard-processors/processors/HashContent.cpp | 6 +++--- .../standard-processors/processors/RouteText.cpp | 6 +++--- .../standard-processors/processors/TailFile.cpp | 2 +- libminifi/src/RemoteProcessorGroupPort.cpp | 2 +- libminifi/test/libtest/unit/MockClasses.h | 2 +- libminifi/test/unit/ExpectedTest.cpp | 6 +++--- libminifi/test/unit/OptionalTest.cpp | 8 +++++++ utils/include/utils/OptionalUtils.h | 25 +++++++++++++++++----- .../utils/detail/MonadicOperationWrappers.h | 14 ++++++++++-- utils/include/utils/expected.h | 16 ++++++++++++-- 28 files changed, 105 insertions(+), 60 deletions(-) diff --git a/extension-utils/src/utils/ProcessorConfigUtils.cpp b/extension-utils/src/utils/ProcessorConfigUtils.cpp index fc0125e97..8c53cef00 100644 --- a/extension-utils/src/utils/ProcessorConfigUtils.cpp +++ b/extension-utils/src/utils/ProcessorConfigUtils.cpp @@ -23,7 +23,7 @@ namespace org::apache::nifi::minifi::utils { std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::expect(fmt::format("Expected valid value from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::orThrow(fmt::format("Expected valid value from {}::{}", ctx.getProcessor().getName(), property.name)); } std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { @@ -32,13 +32,13 @@ std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { if (const auto property_str = ctx.getProperty(property, flow_file)) { - return parsing::parseBool(*property_str) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name)); + return parsing::parseBool(*property_str) | utils::orThrow(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name)); } return std::nullopt; } bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseBool) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseBool) | utils::orThrow(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessor().getName(), property.name)); } std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { @@ -46,14 +46,14 @@ std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx if (property_str->empty()) { return std::nullopt; } - return parsing::parseIntegral<uint64_t>(*property_str) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name)); + return parsing::parseIntegral<uint64_t>(*property_str) | utils::orThrow(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name)); } return std::nullopt; } uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<uint64_t>) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<uint64_t>) | utils::orThrow(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessor().getName(), property.name)); } std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { @@ -61,14 +61,14 @@ std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, if (property_str->empty()) { return std::nullopt; } - return parsing::parseIntegral<int64_t>(*property_str) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name)); + return parsing::parseIntegral<int64_t>(*property_str) | utils::orThrow(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name)); } return std::nullopt; } int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<int64_t>) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<int64_t>) | utils::orThrow(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessor().getName(), property.name)); } std::optional<std::chrono::milliseconds> parseOptionalDurationProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { @@ -76,14 +76,14 @@ std::optional<std::chrono::milliseconds> parseOptionalDurationProperty(const cor if (property_str->empty()) { return std::nullopt; } - return parsing::parseDuration(*property_str) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name)); + return parsing::parseDuration(*property_str) | utils::orThrow(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name)); } return std::nullopt; } std::chrono::milliseconds parseDurationProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::orThrow(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessor().getName(), property.name)); } std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { @@ -91,14 +91,14 @@ std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext if (property_str->empty()) { return std::nullopt; } - return parsing::parseDataSize(*property_str) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name)); + return parsing::parseDataSize(*property_str) | utils::orThrow(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name)); } return std::nullopt; } uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { - return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDataSize) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name)); + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDataSize) | utils::orThrow(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessor().getName(), property.name)); } diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp index 8fda0504a..02f76b19b 100644 --- a/extensions/aws/processors/PutS3Object.cpp +++ b/extensions/aws/processors/PutS3Object.cpp @@ -70,13 +70,13 @@ void PutS3Object::onSchedule(core::ProcessContext& context, core::ProcessSession multipart_threshold_ = context.getProperty(MultipartThreshold) | minifi::utils::andThen([&](const auto str) { return parsing::parseDataSizeMinMax(str, getMinPartSize(), getMaxUploadSize()); }) - | minifi::utils::expect("Multipart Part Size is not between the valid 5MB and 5GB range!"); + | minifi::utils::orThrow("Multipart Part Size is not between the valid 5MB and 5GB range!"); logger_->log_debug("PutS3Object: Multipart Threshold {}", multipart_threshold_); multipart_size_ = context.getProperty(MultipartPartSize) | minifi::utils::andThen([&](const auto str) { return parsing::parseDataSizeMinMax(str, getMinPartSize(), getMaxUploadSize()); }) - | minifi::utils::expect("Multipart Part Size is not between the valid 5MB and 5GB range!"); + | minifi::utils::orThrow("Multipart Part Size is not between the valid 5MB and 5GB range!"); logger_->log_debug("PutS3Object: Multipart Size {}", multipart_size_); diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp index 70467d9e9..50351cec1 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/S3Processor.cpp @@ -98,7 +98,7 @@ void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSession throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid"); } - client_config_->region = context.getProperty(Region) | minifi::utils::expect("Region property missing or invalid"); + client_config_->region = context.getProperty(Region) | minifi::utils::orThrow("Region property missing or invalid"); logger_->log_debug("S3Processor: Region [{}]", client_config_->region); if (auto communications_timeout = minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) { diff --git a/extensions/aws/s3/MultipartUploadStateStorage.cpp b/extensions/aws/s3/MultipartUploadStateStorage.cpp index e654d5e95..ede5bf9e3 100644 --- a/extensions/aws/s3/MultipartUploadStateStorage.cpp +++ b/extensions/aws/s3/MultipartUploadStateStorage.cpp @@ -57,11 +57,11 @@ std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const MultipartUploadState state; state.upload_id = state_map[state_key + ".upload_id"]; - state.upload_time_ms_since_epoch = parsing::parseIntegral<int64_t>(state_map[state_key + ".upload_time"]) | utils::expect(fmt::format("Expected parsable {}.upload_time", state_key)); - state.uploaded_parts = parsing::parseIntegral<size_t>(state_map[state_key + ".uploaded_parts"]) | utils::expect(fmt::format("Expected parsable {}.uploaded_parts", state_key)); - state.uploaded_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".uploaded_size"]) | utils::expect(fmt::format("Expected parsable {}.uploaded_size", state_key)); - state.part_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".part_size"]) | utils::expect(fmt::format("Expected parsable {}.part_size", state_key)); - state.full_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".full_size"]) | utils::expect(fmt::format("Expected parsable {}.full_size", state_key)); + state.upload_time_ms_since_epoch = parsing::parseIntegral<int64_t>(state_map[state_key + ".upload_time"]) | utils::orThrow(fmt::format("Expected parsable {}.upload_time", state_key)); + state.uploaded_parts = parsing::parseIntegral<size_t>(state_map[state_key + ".uploaded_parts"]) | utils::orThrow(fmt::format("Expected parsable {}.uploaded_parts", state_key)); + state.uploaded_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".uploaded_size"]) | utils::orThrow(fmt::format("Expected parsable {}.uploaded_size", state_key)); + state.part_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".part_size"]) | utils::orThrow(fmt::format("Expected parsable {}.part_size", state_key)); + state.full_size = parsing::parseIntegral<uint64_t>(state_map[state_key + ".full_size"]) | utils::orThrow(fmt::format("Expected parsable {}.full_size", state_key)); state.uploaded_etags = minifi::utils::string::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";"); return state; } diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index 39cf718fa..f71da2bee 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -40,7 +40,7 @@ void ListenHTTP::initialize() { } void ListenHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - std::string base_path = context.getProperty(BasePath) | utils::expect("ListenHTTP::BasePath has default value"); + std::string base_path = context.getProperty(BasePath) | utils::orThrow("ListenHTTP::BasePath has default value"); base_path.insert(0, "/"); diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index b097226aa..a86702b98 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -216,7 +216,7 @@ void CouchbaseClusterService::initialize() { } void CouchbaseClusterService::onEnable() { - std::string connection_string = getProperty(ConnectionString.name) | utils::expect("required property"); + std::string connection_string = getProperty(ConnectionString.name) | utils::orThrow("required property"); std::string username = getProperty(UserName.name).value_or(""); std::string password = getProperty(UserPassword.name).value_or(""); diff --git a/extensions/kafka/ConsumeKafka.cpp b/extensions/kafka/ConsumeKafka.cpp index 6cd444f78..1f0f12558 100644 --- a/extensions/kafka/ConsumeKafka.cpp +++ b/extensions/kafka/ConsumeKafka.cpp @@ -142,7 +142,7 @@ void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessCont } logger_->log_info("Loading {} extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size()); for (const std::string& key : dynamic_prop_keys) { - std::string value = context.getDynamicProperty(key) | utils::expect(""); + std::string value = context.getDynamicProperty(key) | utils::orThrow(fmt::format("This shouldn't happen, dynamic property {} is expected because we just queried the list of dynamic properties", key)); logger_->log_info("{}: {}", key.c_str(), value.c_str()); setKafkaConfigurationField(*conf_, key, value); } diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp index e4e5b1a72..9182a3969 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.cpp +++ b/extensions/mqtt/processors/ConsumeMQTT.cpp @@ -206,7 +206,7 @@ void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) { void ConsumeMQTT::checkProperties() { auto is_property_explicitly_set = [this](const std::string_view property_name) -> bool { - const auto property_values = getAllPropertyValues(property_name) | utils::expect("It should only be called on valid property"); + const auto property_values = getAllPropertyValues(property_name) | utils::orThrow("It should only be called on valid property"); return !property_values.empty(); }; if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO) { diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp index f9bcd1970..d71ce21a2 100644 --- a/extensions/mqtt/processors/PublishMQTT.cpp +++ b/extensions/mqtt/processors/PublishMQTT.cpp @@ -133,7 +133,7 @@ bool PublishMQTT::sendMessage(const std::vector<std::byte>& buffer, const std::s void PublishMQTT::checkProperties() { auto is_property_explicitly_set = [this](const std::string_view property_name) -> bool { - const auto property_values = getAllPropertyValues(property_name) | utils::expect("It should only be called on valid property"); + const auto property_values = getAllPropertyValues(property_name) | utils::orThrow("It should only be called on valid property"); return !property_values.empty(); }; if ((mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO)) { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 9502e0365..9b5571fc0 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -30,7 +30,7 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m } logger_->log_debug("MiNiFi Provenance Repository Directory {}", directory_); if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) { - max_partition_bytes_ = gsl::narrow<int64_t>(parsing::parseDataSize(value) | utils::expect("expected parsable data size")); + max_partition_bytes_ = gsl::narrow<int64_t>(parsing::parseDataSize(value) | utils::orThrow("expected parsable data size")); } logger_->log_debug("MiNiFi Provenance Max Partition Bytes {}", max_partition_bytes_); if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) { diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp index 7554eebbd..db1382988 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp @@ -47,15 +47,15 @@ void RocksDbStateStorage::onEnable() { const auto always_persist = getProperty(AlwaysPersist.name) | utils::andThen(parsing::parseBool) - | utils::expect("RocksDbStateStorage::AlwaysPersist has default value"); + | utils::orThrow("RocksDbStateStorage::AlwaysPersist has default value"); logger_->log_info("Always Persist property: {}", always_persist); const auto auto_persistence_interval = getProperty(AutoPersistenceInterval.name) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) - | utils::expect("RocksDbStateStorage::AutoPersistenceInterval has default value"); + | utils::orThrow("RocksDbStateStorage::AutoPersistenceInterval has default value"); logger_->log_info("Auto Persistence Interval property: {}", auto_persistence_interval); - directory_ = getProperty(Directory.name) | utils::expect("RocksDbStateStorage::Directory is required property"); + directory_ = getProperty(Directory.name) | utils::orThrow("RocksDbStateStorage::Directory is required property"); auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); }); db_.reset(); diff --git a/extensions/smb/SmbConnectionControllerService.cpp b/extensions/smb/SmbConnectionControllerService.cpp index 2a9349310..87876dd95 100644 --- a/extensions/smb/SmbConnectionControllerService.cpp +++ b/extensions/smb/SmbConnectionControllerService.cpp @@ -28,8 +28,8 @@ void SmbConnectionControllerService::initialize() { } void SmbConnectionControllerService::onEnable() { - std::string hostname = getProperty(Hostname.name) | utils::expect("Required property"); - std::string share = getProperty(Share.name) | utils::expect("Required property"); + std::string hostname = getProperty(Hostname.name) | utils::orThrow("Required property"); + std::string share = getProperty(Share.name) | utils::orThrow("Required property"); server_path_ = "\\\\" + hostname + "\\" + share; diff --git a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp index e648dd41d..0f9e37de7 100644 --- a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp +++ b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp @@ -70,8 +70,8 @@ rapidjson::Value toJson(const core::RecordObject& field, rapidjson::Document::Al } // namespace void JsonRecordSetWriter::onEnable() { - output_grouping_ = getProperty(OutputGrouping.name) | utils::andThen(parsing::parseEnum<OutputGroupingType>) | utils::expect("JsonRecordSetWriter::OutputGrouping is required property"); - pretty_print_ = getProperty(PrettyPrint.name) | utils::andThen(parsing::parseBool) | utils::expect("Missing JsonRecordSetWriter::PrettyPrint despite default value"); + output_grouping_ = getProperty(OutputGrouping.name) | utils::andThen(parsing::parseEnum<OutputGroupingType>) | utils::orThrow("JsonRecordSetWriter::OutputGrouping is required property"); + pretty_print_ = getProperty(PrettyPrint.name) | utils::andThen(parsing::parseBool) | utils::orThrow("Missing JsonRecordSetWriter::PrettyPrint despite default value"); } void JsonRecordSetWriter::writePerLine(const core::RecordSet& record_set, const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { diff --git a/extensions/standard-processors/modbus/FetchModbusTcp.cpp b/extensions/standard-processors/modbus/FetchModbusTcp.cpp index 489712b91..516a3e9a4 100644 --- a/extensions/standard-processors/modbus/FetchModbusTcp.cpp +++ b/extensions/standard-processors/modbus/FetchModbusTcp.cpp @@ -52,7 +52,7 @@ void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSess timeout_duration_ = utils::parseOptionalDurationProperty(context, Timeout).value_or(15s); - if (context.getProperty(ConnectionPerFlowFile) | utils::andThen(parsing::parseBool) | utils::expect("FetchModbusTcp::ConnectionPerFlowFile is required property")) { + if (context.getProperty(ConnectionPerFlowFile) | utils::andThen(parsing::parseBool) | utils::orThrow("FetchModbusTcp::ConnectionPerFlowFile is required property")) { connections_.reset(); } else { connections_.emplace(); diff --git a/extensions/standard-processors/processors/AppendHostInfo.cpp b/extensions/standard-processors/processors/AppendHostInfo.cpp index 920d9b80d..dfd2a610b 100644 --- a/extensions/standard-processors/processors/AppendHostInfo.cpp +++ b/extensions/standard-processors/processors/AppendHostInfo.cpp @@ -43,8 +43,8 @@ void AppendHostInfo::initialize() { void AppendHostInfo::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { std::unique_lock unique_lock(shared_mutex_); - hostname_attribute_name_ = context.getProperty(HostAttribute) | utils::expect("HostAttribute has default value"); - ipaddress_attribute_name_ = context.getProperty(IPAttribute) | utils::expect("IPAttribute has default value"); + hostname_attribute_name_ = context.getProperty(HostAttribute) | utils::orThrow("HostAttribute has default value"); + ipaddress_attribute_name_ = context.getProperty(IPAttribute) | utils::orThrow("IPAttribute has default value"); interface_name_filter_ = context.getProperty(InterfaceNameFilter) | utils::toOptional() | utils::filter([](const std::string& inf) { return !inf.empty(); }); diff --git a/extensions/standard-processors/processors/AttributesToJSON.cpp b/extensions/standard-processors/processors/AttributesToJSON.cpp index ebff3e9dc..db91ee6a7 100644 --- a/extensions/standard-processors/processors/AttributesToJSON.cpp +++ b/extensions/standard-processors/processors/AttributesToJSON.cpp @@ -49,10 +49,10 @@ void AttributesToJSON::onSchedule(core::ProcessContext& context, core::ProcessSe include_core_attributes_ = context.getProperty(IncludeCoreAttributes) | utils::andThen(parsing::parseBool) - | utils::expect("AttributesToJSON::IncludeCoreAttributes should be available in onSchedule"); + | utils::orThrow("AttributesToJSON::IncludeCoreAttributes should be available in onSchedule"); null_value_ = context.getProperty(NullValue) | utils::andThen(parsing::parseBool) - | utils::expect("AttributesToJSON::NullValue should be available in onSchedule"); + | utils::orThrow("AttributesToJSON::NullValue should be available in onSchedule"); } bool AttributesToJSON::isCoreAttributeToBeFiltered(const std::string& attribute) const { diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp index 9adc63945..a85a47391 100644 --- a/extensions/standard-processors/processors/DefragmentText.cpp +++ b/extensions/standard-processors/processors/DefragmentText.cpp @@ -53,7 +53,7 @@ void DefragmentText::onSchedule(core::ProcessContext& context, core::ProcessSess pattern_ = context.getProperty(Pattern) | utils::transform([](const auto pattern_str) { return utils::Regex{pattern_str}; }) - | utils::expect("Pattern property missing or invalid"); + | utils::orThrow("Pattern property missing or invalid"); } void DefragmentText::onTrigger(core::ProcessContext&, core::ProcessSession& session) { diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp index 105abcc1e..45693a8a7 100644 --- a/extensions/standard-processors/processors/ExtractText.cpp +++ b/extensions/standard-processors/processors/ExtractText.cpp @@ -63,7 +63,7 @@ int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::InputStr std::string attrKey = ctx_->getProperty(Attribute).value_or(""); std::string sizeLimitStr = ctx_->getProperty(SizeLimit).value_or(""); - bool regex_mode = ctx_->getProperty(RegexMode) | utils::andThen(parsing::parseBool) | utils::expect("Missing ExtractText::RegexMode despite default value"); + bool regex_mode = ctx_->getProperty(RegexMode) | utils::andThen(parsing::parseBool) | utils::orThrow("Missing ExtractText::RegexMode despite default value"); if (sizeLimitStr.empty()) sizeLimitStr = DEFAULT_SIZE_LIMIT_STR; diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp index 269d1b4cc..b7d199d78 100644 --- a/extensions/standard-processors/processors/HashContent.cpp +++ b/extensions/standard-processors/processors/HashContent.cpp @@ -39,11 +39,11 @@ void HashContent::initialize() { } void HashContent::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - attrKey_ = context.getProperty(HashAttribute) | utils::expect("Missing HashContent::HashAttribute despite default value"); - failOnEmpty_ = context.getProperty(FailOnEmpty) | utils::andThen(parsing::parseBool) | utils::expect("Missing HashContent::FailOnEmpty despite default value"); + attrKey_ = context.getProperty(HashAttribute) | utils::orThrow("Missing HashContent::HashAttribute despite default value"); + failOnEmpty_ = context.getProperty(FailOnEmpty) | utils::andThen(parsing::parseBool) | utils::orThrow("Missing HashContent::FailOnEmpty despite default value"); { - std::string algo_name = context.getProperty(HashAlgorithm) | utils::expect("HashContent::HashAlgorithm is required property"); + std::string algo_name = context.getProperty(HashAlgorithm) | utils::orThrow("HashContent::HashAlgorithm is required property"); std::transform(algo_name.begin(), algo_name.end(), algo_name.begin(), ::toupper); std::erase(algo_name, '-'); if (!HashAlgos.contains(algo_name)) { diff --git a/extensions/standard-processors/processors/RouteText.cpp b/extensions/standard-processors/processors/RouteText.cpp index 1688f638d..205ec5566 100644 --- a/extensions/standard-processors/processors/RouteText.cpp +++ b/extensions/standard-processors/processors/RouteText.cpp @@ -163,7 +163,7 @@ class RouteText::MatchingContext { if (it != regex_values_.end()) { return it->second; } - const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::expect("Missing dynamic property"); + const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); std::vector<utils::Regex::Mode> flags; if (case_policy_ == route_text::CasePolicy::IGNORE_CASE) { flags.push_back(utils::Regex::Mode::ICASE); @@ -176,7 +176,7 @@ class RouteText::MatchingContext { if (it != string_values_.end()) { return it->second; } - const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::expect("Missing dynamic property"); + const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); return (string_values_[property_name] = value); } @@ -185,7 +185,7 @@ class RouteText::MatchingContext { if (it != searcher_values_.end()) { return it->second.searcher_; } - const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::expect("Missing dynamic property"); + const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); return searcher_values_.emplace( std::piecewise_construct, std::forward_as_tuple(property_name), diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index 9cf889c13..c1d714445 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -797,7 +797,7 @@ std::string TailFile::baseDirectoryFromAttributes(const controllers::AttributePr for (const auto& [key, value] : attribute_map) { flow_file->setAttribute(key, value); } - return context.getProperty(BaseDirectory, flow_file.get()) | utils::expect("Base directory is required for multiple tail mode."); + return context.getProperty(BaseDirectory, flow_file.get()) | utils::orThrow("Base directory is required for multiple tail mode."); } std::chrono::milliseconds TailFile::getLookupFrequency() const { diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 608b2afb0..ba340685d 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -131,7 +131,7 @@ void RemoteProcessorGroupPort::onSchedule(core::ProcessContext& context, core::P } } - idle_timeout_ = context.getProperty(idleTimeout) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::expect("RemoteProcessGroupPort::idleTimeout has default value"); + idle_timeout_ = context.getProperty(idleTimeout) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::orThrow("RemoteProcessGroupPort::idleTimeout has default value"); std::lock_guard<std::mutex> lock(peer_mutex_); if (!nifi_instances_.empty()) { diff --git a/libminifi/test/libtest/unit/MockClasses.h b/libminifi/test/libtest/unit/MockClasses.h index ff89d1fc6..a38791d38 100644 --- a/libminifi/test/libtest/unit/MockClasses.h +++ b/libminifi/test/libtest/unit/MockClasses.h @@ -130,7 +130,7 @@ class MockProcessor : public minifi::core::ProcessorImpl { // and verify that we can execute it. } - bool in_sub_process_group = getProperty("InSubProcessGroup") | minifi::utils::andThen(minifi::parsing::parseBool) | minifi::utils::expect(""); + bool in_sub_process_group = getProperty("InSubProcessGroup") | minifi::utils::andThen(minifi::parsing::parseBool) | minifi::utils::orThrow(""); auto sub_service = context.getControllerService("SubMockController", getUUID()); if (in_sub_process_group) { REQUIRE(nullptr != sub_service); diff --git a/libminifi/test/unit/ExpectedTest.cpp b/libminifi/test/unit/ExpectedTest.cpp index 8d6463786..10907c7d7 100644 --- a/libminifi/test/unit/ExpectedTest.cpp +++ b/libminifi/test/unit/ExpectedTest.cpp @@ -554,10 +554,10 @@ TEST_CASE("expected toOptional") { CHECK(res2 == 5); } -TEST_CASE("expected expect") { +TEST_CASE("expected orThrow") { nonstd::expected<int, std::string> unexpected{nonstd::unexpect, "hello"}; nonstd::expected<int, std::string> expected{5}; - REQUIRE_THROWS_WITH(std::move(unexpected) | utils::expect("should throw"), "should throw"); - CHECK((std::move(expected) | utils::expect("should be 5")) == 5); + REQUIRE_THROWS_WITH(std::move(unexpected) | utils::orThrow("should throw"), "should throw: hello"); + CHECK((std::move(expected) | utils::orThrow("should be 5")) == 5); } diff --git a/libminifi/test/unit/OptionalTest.cpp b/libminifi/test/unit/OptionalTest.cpp index d5cff9335..5d28da01c 100644 --- a/libminifi/test/unit/OptionalTest.cpp +++ b/libminifi/test/unit/OptionalTest.cpp @@ -93,3 +93,11 @@ TEST_CASE("optional toExpected") { CHECK(expected_from_null_opt_ec.error() == std::make_error_code(std::io_errc::stream)); CHECK(expected_from_null_opt_int.error() == 9); } + +TEST_CASE("optional orThrow") { + std::optional<int> opt_with_value = 5; + std::optional<int> opt_without_value = std::nullopt; + + REQUIRE_THROWS_WITH(opt_without_value | utils::orThrow("should throw"), "should throw"); + CHECK((opt_with_value | utils::orThrow("should be 5")) == 5); +} diff --git a/utils/include/utils/OptionalUtils.h b/utils/include/utils/OptionalUtils.h index 30451b644..907990496 100644 --- a/utils/include/utils/OptionalUtils.h +++ b/utils/include/utils/OptionalUtils.h @@ -15,18 +15,19 @@ * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_ -#define LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_ +#pragma once #include <functional> #include <optional> #include <type_traits> #include <utility> +#include <iostream> #include "nonstd/expected.hpp" #include "utils/GeneralUtils.h" #include "utils/gsl.h" #include "utils/detail/MonadicOperationWrappers.h" +#include "fmt/format.h" namespace org::apache::nifi::minifi::utils { @@ -134,8 +135,22 @@ nonstd::expected<T, E> operator|(std::optional<T> object, to_expected_wrapper<E> } return std::move(*object); } -} // namespace detail -} // namespace org::apache::nifi::minifi::utils -#endif // LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_ +template<typename T> +T&& operator|(std::optional<T> object, const or_throw_wrapper e) { + if (object) { + return std::move(*object); + } + throw std::runtime_error(e.reason); +} +template<typename T> +T&& operator|(std::optional<T> object, const or_terminate_wrapper e) { + if (object) { + return std::move(*object); + } + std::cerr << fmt::format("Aborting due to {}", e.reason) << std::endl; + std::abort(); +} +} // namespace detail +} // namespace org::apache::nifi::minifi::utils diff --git a/utils/include/utils/detail/MonadicOperationWrappers.h b/utils/include/utils/detail/MonadicOperationWrappers.h index 6d94da403..7c491643b 100644 --- a/utils/include/utils/detail/MonadicOperationWrappers.h +++ b/utils/include/utils/detail/MonadicOperationWrappers.h @@ -57,7 +57,11 @@ struct to_expected_wrapper { E error; }; -struct expect_wrapper { +struct or_throw_wrapper { + std::string reason; +}; + +struct or_terminate_wrapper { std::string reason; }; } // namespace detail @@ -117,7 +121,13 @@ inline detail::to_optional_wrapper toOptional() noexcept { return {}; } * For optional-like types, returns the present value or throws with the provided message * It is recommended that expect messages are used to describe the reason you expect the optional-like to have value. */ -inline detail::expect_wrapper expect(std::string&& exception_message) noexcept { return {std::forward<std::string>(std::move(exception_message))}; } +inline detail::or_throw_wrapper orThrow(std::string&& exception_message) noexcept { return {std::forward<std::string>(std::move(exception_message))}; } + +/** + * For optional-like types, returns the present value or aborts with the provided message + */ +inline detail::or_terminate_wrapper orTerminate(std::string&& exception_message) noexcept { return {std::forward<std::string>(std::move(exception_message))}; } + /** * For optional-like types, only keep the present value if it satisfies the predicate diff --git a/utils/include/utils/expected.h b/utils/include/utils/expected.h index 0ddd40012..b2d44949d 100644 --- a/utils/include/utils/expected.h +++ b/utils/include/utils/expected.h @@ -18,8 +18,11 @@ #include <type_traits> #include <utility> #include <optional> +#include <iostream> + #include "nonstd/expected.hpp" #include "utils/detail/MonadicOperationWrappers.h" +#include "fmt/format.h" namespace org::apache::nifi::minifi::utils { namespace detail { @@ -191,11 +194,20 @@ std::optional<typename std::remove_cvref_t<Expected>::value_type> operator|(Expe } template<expected Expected> -typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& object, expect_wrapper e) { +typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& object, or_throw_wrapper e) { + if (object) { + return std::move(*object); + } + throw std::runtime_error(fmt::format("{}: {}", e.reason, object.error())); +} + +template<expected Expected> +typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& object, detail::or_terminate_wrapper e) { if (object) { return std::move(*object); } - throw std::runtime_error(e.reason); + std::cerr << fmt::format("Aborting due to {}: {}", e.reason, object.error()) << std::endl; + std::abort(); } } // namespace detail
