This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch minifi-api-property
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit af72292a3b59d76d103764cf6a1f39fd06374759
Author: Martin Zink <[email protected]>
AuthorDate: Thu Mar 13 13:00:02 2025 +0100

    orThrow and orTerminate
---
 extension-utils/src/utils/ProcessorConfigUtils.cpp | 22 +++++++++----------
 extensions/aws/processors/PutS3Object.cpp          |  4 ++--
 extensions/aws/processors/S3Processor.cpp          |  2 +-
 extensions/aws/s3/MultipartUploadStateStorage.cpp  | 10 ++++-----
 extensions/civetweb/processors/ListenHTTP.cpp      |  2 +-
 .../controllerservices/CouchbaseClusterService.cpp |  2 +-
 extensions/kafka/ConsumeKafka.cpp                  |  2 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |  2 +-
 .../controllers/RocksDbStateStorage.cpp            |  6 +++---
 extensions/smb/SmbConnectionControllerService.cpp  |  4 ++--
 .../controllers/JsonRecordSetWriter.cpp            |  4 ++--
 .../standard-processors/modbus/FetchModbusTcp.cpp  |  2 +-
 .../processors/AppendHostInfo.cpp                  |  4 ++--
 .../processors/AttributesToJSON.cpp                |  4 ++--
 .../processors/DefragmentText.cpp                  |  2 +-
 .../standard-processors/processors/ExtractText.cpp |  2 +-
 .../standard-processors/processors/HashContent.cpp |  6 +++---
 .../standard-processors/processors/RouteText.cpp   |  6 +++---
 .../standard-processors/processors/TailFile.cpp    |  2 +-
 libminifi/src/RemoteProcessorGroupPort.cpp         |  2 +-
 libminifi/test/libtest/unit/MockClasses.h          |  2 +-
 libminifi/test/unit/ExpectedTest.cpp               |  6 +++---
 libminifi/test/unit/OptionalTest.cpp               |  8 +++++++
 utils/include/utils/OptionalUtils.h                | 25 +++++++++++++++++-----
 .../utils/detail/MonadicOperationWrappers.h        | 14 ++++++++++--
 utils/include/utils/expected.h                     | 16 ++++++++++++--
 28 files changed, 105 insertions(+), 60 deletions(-)

diff --git a/extension-utils/src/utils/ProcessorConfigUtils.cpp 
b/extension-utils/src/utils/ProcessorConfigUtils.cpp
index fc0125e97..8c53cef00 100644
--- a/extension-utils/src/utils/ProcessorConfigUtils.cpp
+++ b/extension-utils/src/utils/ProcessorConfigUtils.cpp
@@ -23,7 +23,7 @@
 namespace org::apache::nifi::minifi::utils {
 
 std::string parseProperty(const core::ProcessContext& ctx, const 
core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::expect(fmt::format("Expected valid value from {}::{}", 
ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::orThrow(fmt::format("Expected valid value from {}::{}", 
ctx.getProcessor().getName(), property.name));
 }
 
 std::optional<std::string> parseOptionalProperty(const core::ProcessContext& 
ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
@@ -32,13 +32,13 @@ std::optional<std::string> parseOptionalProperty(const 
core::ProcessContext& ctx
 
 std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, 
const core::PropertyReference& property, const core::FlowFile* flow_file) {
   if (const auto property_str = ctx.getProperty(property, flow_file)) {
-    return parsing::parseBool(*property_str) | 
utils::expect(fmt::format("Expected parsable bool from {}::{}", 
ctx.getProcessor().getName(), property.name));
+    return parsing::parseBool(*property_str) | 
utils::orThrow(fmt::format("Expected parsable bool from {}::{}", 
ctx.getProcessor().getName(), property.name));
   }
   return std::nullopt;
 }
 
 bool parseBoolProperty(const core::ProcessContext& ctx, const 
core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseBool) | utils::expect(fmt::format("Expected 
parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseBool) | utils::orThrow(fmt::format("Expected 
parsable bool from {}::{}", ctx.getProcessor().getName(), property.name));
 }
 
 std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& 
ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
@@ -46,14 +46,14 @@ std::optional<uint64_t> parseOptionalU64Property(const 
core::ProcessContext& ctx
     if (property_str->empty()) {
         return std::nullopt;
     }
-    return parsing::parseIntegral<uint64_t>(*property_str) | 
utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
+    return parsing::parseIntegral<uint64_t>(*property_str) | 
utils::orThrow(fmt::format("Expected parsable uint64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
   }
 
   return std::nullopt;
 }
 
 uint64_t parseU64Property(const core::ProcessContext& ctx, const 
core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseIntegral<uint64_t>) | 
utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseIntegral<uint64_t>) | 
utils::orThrow(fmt::format("Expected parsable uint64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
 }
 
 std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& 
ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
@@ -61,14 +61,14 @@ std::optional<int64_t> parseOptionalI64Property(const 
core::ProcessContext& ctx,
     if (property_str->empty()) {
       return std::nullopt;
     }
-    return parsing::parseIntegral<int64_t>(*property_str) | 
utils::expect(fmt::format("Expected parsable int64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
+    return parsing::parseIntegral<int64_t>(*property_str) | 
utils::orThrow(fmt::format("Expected parsable int64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
   }
 
   return std::nullopt;
 }
 
 int64_t parseI64Property(const core::ProcessContext& ctx, const 
core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseIntegral<int64_t>) | 
utils::expect(fmt::format("Expected parsable int64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseIntegral<int64_t>) | 
utils::orThrow(fmt::format("Expected parsable int64_t from {}::{}", 
ctx.getProcessor().getName(), property.name));
 }
 
 std::optional<std::chrono::milliseconds> parseOptionalDurationProperty(const 
core::ProcessContext& ctx, const core::PropertyReference& property, const 
core::FlowFile* flow_file) {
@@ -76,14 +76,14 @@ std::optional<std::chrono::milliseconds> 
parseOptionalDurationProperty(const cor
     if (property_str->empty()) {
       return std::nullopt;
     }
-    return parsing::parseDuration(*property_str) | 
utils::expect(fmt::format("Expected parsable duration from {}::{}", 
ctx.getProcessor().getName(), property.name));
+    return parsing::parseDuration(*property_str) | 
utils::orThrow(fmt::format("Expected parsable duration from {}::{}", 
ctx.getProcessor().getName(), property.name));
   }
 
   return std::nullopt;
 }
 
 std::chrono::milliseconds parseDurationProperty(const core::ProcessContext& 
ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::expect(fmt::format("Expected parsable duration from {}::{}", 
ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::orThrow(fmt::format("Expected parsable duration from {}::{}", 
ctx.getProcessor().getName(), property.name));
 }
 
 std::optional<uint64_t> parseOptionalDataSizeProperty(const 
core::ProcessContext& ctx, const core::PropertyReference& property, const 
core::FlowFile* flow_file) {
@@ -91,14 +91,14 @@ std::optional<uint64_t> parseOptionalDataSizeProperty(const 
core::ProcessContext
     if (property_str->empty()) {
       return std::nullopt;
     }
-    return parsing::parseDataSize(*property_str) | 
utils::expect(fmt::format("Expected parsable data size from {}::{}", 
ctx.getProcessor().getName(), property.name));
+    return parsing::parseDataSize(*property_str) | 
utils::orThrow(fmt::format("Expected parsable data size from {}::{}", 
ctx.getProcessor().getName(), property.name));
   }
 
   return std::nullopt;
 }
 
 uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const 
core::PropertyReference& property, const core::FlowFile* flow_file) {
-  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseDataSize) | utils::expect(fmt::format("Expected 
parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
+  return ctx.getProperty(property, flow_file) | 
utils::andThen(parsing::parseDataSize) | utils::orThrow(fmt::format("Expected 
parsable data size from {}::{}", ctx.getProcessor().getName(), property.name));
 }
 
 
diff --git a/extensions/aws/processors/PutS3Object.cpp 
b/extensions/aws/processors/PutS3Object.cpp
index 8fda0504a..02f76b19b 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -70,13 +70,13 @@ void PutS3Object::onSchedule(core::ProcessContext& context, 
core::ProcessSession
 
   multipart_threshold_ = context.getProperty(MultipartThreshold)
       | minifi::utils::andThen([&](const auto str) { return 
parsing::parseDataSizeMinMax(str, getMinPartSize(), getMaxUploadSize()); })
-      | minifi::utils::expect("Multipart Part Size is not between the valid 
5MB and 5GB range!");
+      | minifi::utils::orThrow("Multipart Part Size is not between the valid 
5MB and 5GB range!");
 
   logger_->log_debug("PutS3Object: Multipart Threshold {}", 
multipart_threshold_);
 
   multipart_size_ = context.getProperty(MultipartPartSize)
     | minifi::utils::andThen([&](const auto str) { return 
parsing::parseDataSizeMinMax(str, getMinPartSize(), getMaxUploadSize()); })
-    | minifi::utils::expect("Multipart Part Size is not between the valid 5MB 
and 5GB range!");
+    | minifi::utils::orThrow("Multipart Part Size is not between the valid 5MB 
and 5GB range!");
 
 
   logger_->log_debug("PutS3Object: Multipart Size {}", multipart_size_);
diff --git a/extensions/aws/processors/S3Processor.cpp 
b/extensions/aws/processors/S3Processor.cpp
index 70467d9e9..50351cec1 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -98,7 +98,7 @@ void S3Processor::onSchedule(core::ProcessContext& context, 
core::ProcessSession
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or 
invalid");
   }
 
-  client_config_->region = context.getProperty(Region) | 
minifi::utils::expect("Region property missing or invalid");
+  client_config_->region = context.getProperty(Region) | 
minifi::utils::orThrow("Region property missing or invalid");
   logger_->log_debug("S3Processor: Region [{}]", client_config_->region);
 
   if (auto communications_timeout = 
minifi::utils::parseOptionalDurationProperty(context, CommunicationsTimeout)) {
diff --git a/extensions/aws/s3/MultipartUploadStateStorage.cpp 
b/extensions/aws/s3/MultipartUploadStateStorage.cpp
index e654d5e95..ede5bf9e3 100644
--- a/extensions/aws/s3/MultipartUploadStateStorage.cpp
+++ b/extensions/aws/s3/MultipartUploadStateStorage.cpp
@@ -57,11 +57,11 @@ std::optional<MultipartUploadState> 
MultipartUploadStateStorage::getState(const
   MultipartUploadState state;
   state.upload_id = state_map[state_key + ".upload_id"];
 
-  state.upload_time_ms_since_epoch = 
parsing::parseIntegral<int64_t>(state_map[state_key + ".upload_time"]) | 
utils::expect(fmt::format("Expected parsable {}.upload_time", state_key));
-  state.uploaded_parts = parsing::parseIntegral<size_t>(state_map[state_key + 
".uploaded_parts"]) | utils::expect(fmt::format("Expected parsable 
{}.uploaded_parts", state_key));
-  state.uploaded_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".uploaded_size"]) | utils::expect(fmt::format("Expected parsable 
{}.uploaded_size", state_key));
-  state.part_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".part_size"]) | utils::expect(fmt::format("Expected parsable {}.part_size", 
state_key));
-  state.full_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".full_size"]) | utils::expect(fmt::format("Expected parsable {}.full_size", 
state_key));
+  state.upload_time_ms_since_epoch = 
parsing::parseIntegral<int64_t>(state_map[state_key + ".upload_time"]) | 
utils::orThrow(fmt::format("Expected parsable {}.upload_time", state_key));
+  state.uploaded_parts = parsing::parseIntegral<size_t>(state_map[state_key + 
".uploaded_parts"]) | utils::orThrow(fmt::format("Expected parsable 
{}.uploaded_parts", state_key));
+  state.uploaded_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".uploaded_size"]) | utils::orThrow(fmt::format("Expected parsable 
{}.uploaded_size", state_key));
+  state.part_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".part_size"]) | utils::orThrow(fmt::format("Expected parsable {}.part_size", 
state_key));
+  state.full_size = parsing::parseIntegral<uint64_t>(state_map[state_key + 
".full_size"]) | utils::orThrow(fmt::format("Expected parsable {}.full_size", 
state_key));
   state.uploaded_etags = 
minifi::utils::string::splitAndTrimRemovingEmpty(state_map[state_key + 
".uploaded_etags"], ";");
   return state;
 }
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp 
b/extensions/civetweb/processors/ListenHTTP.cpp
index 39cf718fa..f71da2bee 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -40,7 +40,7 @@ void ListenHTTP::initialize() {
 }
 
 void ListenHTTP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  std::string base_path = context.getProperty(BasePath) | 
utils::expect("ListenHTTP::BasePath has default value");
+  std::string base_path = context.getProperty(BasePath) | 
utils::orThrow("ListenHTTP::BasePath has default value");
 
   base_path.insert(0, "/");
 
diff --git 
a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
index b097226aa..a86702b98 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
@@ -216,7 +216,7 @@ void CouchbaseClusterService::initialize() {
 }
 
 void CouchbaseClusterService::onEnable() {
-  std::string connection_string = getProperty(ConnectionString.name) | 
utils::expect("required property");
+  std::string connection_string = getProperty(ConnectionString.name) | 
utils::orThrow("required property");
   std::string username = getProperty(UserName.name).value_or("");
   std::string password = getProperty(UserPassword.name).value_or("");
 
diff --git a/extensions/kafka/ConsumeKafka.cpp 
b/extensions/kafka/ConsumeKafka.cpp
index 6cd444f78..1f0f12558 100644
--- a/extensions/kafka/ConsumeKafka.cpp
+++ b/extensions/kafka/ConsumeKafka.cpp
@@ -142,7 +142,7 @@ void 
ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessCont
   }
   logger_->log_info("Loading {} extra kafka configuration fields from 
ConsumeKafka dynamic properties:", dynamic_prop_keys.size());
   for (const std::string& key : dynamic_prop_keys) {
-    std::string value = context.getDynamicProperty(key) | utils::expect("");
+    std::string value = context.getDynamicProperty(key) | 
utils::orThrow(fmt::format("This shouldn't happen, dynamic property {} is 
expected because we just queried the list of dynamic properties", key));
     logger_->log_info("{}: {}", key.c_str(), value.c_str());
     setKafkaConfigurationField(*conf_, key, value);
   }
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp 
b/extensions/mqtt/processors/ConsumeMQTT.cpp
index e4e5b1a72..9182a3969 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -206,7 +206,7 @@ void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& 
smart_message) {
 
 void ConsumeMQTT::checkProperties() {
   auto is_property_explicitly_set = [this](const std::string_view 
property_name) -> bool {
-    const auto property_values = getAllPropertyValues(property_name) | 
utils::expect("It should only be called on valid property");
+    const auto property_values = getAllPropertyValues(property_name) | 
utils::orThrow("It should only be called on valid property");
     return !property_values.empty();
   };
   if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == 
mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO) {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp 
b/extensions/mqtt/processors/PublishMQTT.cpp
index f9bcd1970..d71ce21a2 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -133,7 +133,7 @@ bool PublishMQTT::sendMessage(const std::vector<std::byte>& 
buffer, const std::s
 
 void PublishMQTT::checkProperties() {
   auto is_property_explicitly_set = [this](const std::string_view 
property_name) -> bool {
-    const auto property_values = getAllPropertyValues(property_name) | 
utils::expect("It should only be called on valid property");
+    const auto property_values = getAllPropertyValues(property_name) | 
utils::orThrow("It should only be called on valid property");
     return !property_values.empty();
   };
   if ((mqtt_version_ == mqtt::MqttVersions::V_3_1_0 || mqtt_version_ == 
mqtt::MqttVersions::V_3_1_1 || mqtt_version_ == mqtt::MqttVersions::V_3X_AUTO)) 
{
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp 
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 9502e0365..9b5571fc0 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -30,7 +30,7 @@ bool ProvenanceRepository::initialize(const 
std::shared_ptr<org::apache::nifi::m
   }
   logger_->log_debug("MiNiFi Provenance Repository Directory {}", directory_);
   if (config->get(Configure::nifi_provenance_repository_max_storage_size, 
value)) {
-    max_partition_bytes_ = gsl::narrow<int64_t>(parsing::parseDataSize(value) 
| utils::expect("expected parsable data size"));
+    max_partition_bytes_ = gsl::narrow<int64_t>(parsing::parseDataSize(value) 
| utils::orThrow("expected parsable data size"));
   }
   logger_->log_debug("MiNiFi Provenance Max Partition Bytes {}", 
max_partition_bytes_);
   if (config->get(Configure::nifi_provenance_repository_max_storage_time, 
value)) {
diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp 
b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
index 7554eebbd..db1382988 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
@@ -47,15 +47,15 @@ void RocksDbStateStorage::onEnable() {
 
   const auto always_persist = getProperty(AlwaysPersist.name)
       | utils::andThen(parsing::parseBool)
-      | utils::expect("RocksDbStateStorage::AlwaysPersist has default value");
+      | utils::orThrow("RocksDbStateStorage::AlwaysPersist has default value");
   logger_->log_info("Always Persist property: {}", always_persist);
 
   const auto auto_persistence_interval = 
getProperty(AutoPersistenceInterval.name)
       | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>)
-      | utils::expect("RocksDbStateStorage::AutoPersistenceInterval has 
default value");
+      | utils::orThrow("RocksDbStateStorage::AutoPersistenceInterval has 
default value");
   logger_->log_info("Auto Persistence Interval property: {}", 
auto_persistence_interval);
 
-  directory_ = getProperty(Directory.name) | 
utils::expect("RocksDbStateStorage::Directory is required property");
+  directory_ = getProperty(Directory.name) | 
utils::orThrow("RocksDbStateStorage::Directory is required property");
 
   auto_persistor_.start(always_persist, auto_persistence_interval, [this] { 
return persistNonVirtual(); });
   db_.reset();
diff --git a/extensions/smb/SmbConnectionControllerService.cpp 
b/extensions/smb/SmbConnectionControllerService.cpp
index 2a9349310..87876dd95 100644
--- a/extensions/smb/SmbConnectionControllerService.cpp
+++ b/extensions/smb/SmbConnectionControllerService.cpp
@@ -28,8 +28,8 @@ void SmbConnectionControllerService::initialize() {
 }
 
 void SmbConnectionControllerService::onEnable()  {
-  std::string hostname = getProperty(Hostname.name) | utils::expect("Required 
property");
-  std::string share = getProperty(Share.name) | utils::expect("Required 
property");
+  std::string hostname = getProperty(Hostname.name) | utils::orThrow("Required 
property");
+  std::string share = getProperty(Share.name) | utils::orThrow("Required 
property");
 
   server_path_ = "\\\\" + hostname + "\\" + share;
 
diff --git a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp 
b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp
index e648dd41d..0f9e37de7 100644
--- a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp
+++ b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp
@@ -70,8 +70,8 @@ rapidjson::Value toJson(const core::RecordObject& field, 
rapidjson::Document::Al
 }  // namespace
 
 void JsonRecordSetWriter::onEnable() {
-  output_grouping_ = getProperty(OutputGrouping.name) | 
utils::andThen(parsing::parseEnum<OutputGroupingType>) | 
utils::expect("JsonRecordSetWriter::OutputGrouping is required property");
-  pretty_print_ = getProperty(PrettyPrint.name) | 
utils::andThen(parsing::parseBool) | utils::expect("Missing 
JsonRecordSetWriter::PrettyPrint despite default value");
+  output_grouping_ = getProperty(OutputGrouping.name) | 
utils::andThen(parsing::parseEnum<OutputGroupingType>) | 
utils::orThrow("JsonRecordSetWriter::OutputGrouping is required property");
+  pretty_print_ = getProperty(PrettyPrint.name) | 
utils::andThen(parsing::parseBool) | utils::orThrow("Missing 
JsonRecordSetWriter::PrettyPrint despite default value");
 }
 
 void JsonRecordSetWriter::writePerLine(const core::RecordSet& record_set, 
const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& 
session) {
diff --git a/extensions/standard-processors/modbus/FetchModbusTcp.cpp 
b/extensions/standard-processors/modbus/FetchModbusTcp.cpp
index 489712b91..516a3e9a4 100644
--- a/extensions/standard-processors/modbus/FetchModbusTcp.cpp
+++ b/extensions/standard-processors/modbus/FetchModbusTcp.cpp
@@ -52,7 +52,7 @@ void FetchModbusTcp::onSchedule(core::ProcessContext& 
context, core::ProcessSess
   timeout_duration_ = utils::parseOptionalDurationProperty(context, 
Timeout).value_or(15s);
 
 
-  if (context.getProperty(ConnectionPerFlowFile) | 
utils::andThen(parsing::parseBool) | 
utils::expect("FetchModbusTcp::ConnectionPerFlowFile is required property")) {
+  if (context.getProperty(ConnectionPerFlowFile) | 
utils::andThen(parsing::parseBool) | 
utils::orThrow("FetchModbusTcp::ConnectionPerFlowFile is required property")) {
     connections_.reset();
   } else {
     connections_.emplace();
diff --git a/extensions/standard-processors/processors/AppendHostInfo.cpp 
b/extensions/standard-processors/processors/AppendHostInfo.cpp
index 920d9b80d..dfd2a610b 100644
--- a/extensions/standard-processors/processors/AppendHostInfo.cpp
+++ b/extensions/standard-processors/processors/AppendHostInfo.cpp
@@ -43,8 +43,8 @@ void AppendHostInfo::initialize() {
 
 void AppendHostInfo::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
   std::unique_lock unique_lock(shared_mutex_);
-  hostname_attribute_name_ = context.getProperty(HostAttribute) | 
utils::expect("HostAttribute has default value");
-  ipaddress_attribute_name_ = context.getProperty(IPAttribute) | 
utils::expect("IPAttribute has default value");
+  hostname_attribute_name_ = context.getProperty(HostAttribute) | 
utils::orThrow("HostAttribute has default value");
+  ipaddress_attribute_name_ = context.getProperty(IPAttribute) | 
utils::orThrow("IPAttribute has default value");
   interface_name_filter_ = context.getProperty(InterfaceNameFilter)
       | utils::toOptional()
       | utils::filter([](const std::string& inf) { return !inf.empty(); });
diff --git a/extensions/standard-processors/processors/AttributesToJSON.cpp 
b/extensions/standard-processors/processors/AttributesToJSON.cpp
index ebff3e9dc..db91ee6a7 100644
--- a/extensions/standard-processors/processors/AttributesToJSON.cpp
+++ b/extensions/standard-processors/processors/AttributesToJSON.cpp
@@ -49,10 +49,10 @@ void AttributesToJSON::onSchedule(core::ProcessContext& 
context, core::ProcessSe
 
   include_core_attributes_ = context.getProperty(IncludeCoreAttributes)
       | utils::andThen(parsing::parseBool)
-      | utils::expect("AttributesToJSON::IncludeCoreAttributes should be 
available in onSchedule");
+      | utils::orThrow("AttributesToJSON::IncludeCoreAttributes should be 
available in onSchedule");
   null_value_ = context.getProperty(NullValue)
       | utils::andThen(parsing::parseBool)
-      | utils::expect("AttributesToJSON::NullValue should be available in 
onSchedule");
+      | utils::orThrow("AttributesToJSON::NullValue should be available in 
onSchedule");
 }
 
 bool AttributesToJSON::isCoreAttributeToBeFiltered(const std::string& 
attribute) const {
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp 
b/extensions/standard-processors/processors/DefragmentText.cpp
index 9adc63945..a85a47391 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -53,7 +53,7 @@ void DefragmentText::onSchedule(core::ProcessContext& 
context, core::ProcessSess
 
   pattern_ = context.getProperty(Pattern)
     | utils::transform([](const auto pattern_str) { return 
utils::Regex{pattern_str}; })
-    | utils::expect("Pattern property missing or invalid");
+    | utils::orThrow("Pattern property missing or invalid");
 }
 
 void DefragmentText::onTrigger(core::ProcessContext&, core::ProcessSession& 
session) {
diff --git a/extensions/standard-processors/processors/ExtractText.cpp 
b/extensions/standard-processors/processors/ExtractText.cpp
index 105abcc1e..45693a8a7 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -63,7 +63,7 @@ int64_t ExtractText::ReadCallback::operator()(const 
std::shared_ptr<io::InputStr
 
   std::string attrKey = ctx_->getProperty(Attribute).value_or("");
   std::string sizeLimitStr = ctx_->getProperty(SizeLimit).value_or("");
-  bool regex_mode = ctx_->getProperty(RegexMode) | 
utils::andThen(parsing::parseBool) | utils::expect("Missing 
ExtractText::RegexMode despite default value");
+  bool regex_mode = ctx_->getProperty(RegexMode) | 
utils::andThen(parsing::parseBool) | utils::orThrow("Missing 
ExtractText::RegexMode despite default value");
 
   if (sizeLimitStr.empty())
     sizeLimitStr = DEFAULT_SIZE_LIMIT_STR;
diff --git a/extensions/standard-processors/processors/HashContent.cpp 
b/extensions/standard-processors/processors/HashContent.cpp
index 269d1b4cc..b7d199d78 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -39,11 +39,11 @@ void HashContent::initialize() {
 }
 
 void HashContent::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  attrKey_ = context.getProperty(HashAttribute) | utils::expect("Missing 
HashContent::HashAttribute despite default value");
-  failOnEmpty_ = context.getProperty(FailOnEmpty) | 
utils::andThen(parsing::parseBool) | utils::expect("Missing 
HashContent::FailOnEmpty despite default value");
+  attrKey_ = context.getProperty(HashAttribute) | utils::orThrow("Missing 
HashContent::HashAttribute despite default value");
+  failOnEmpty_ = context.getProperty(FailOnEmpty) | 
utils::andThen(parsing::parseBool) | utils::orThrow("Missing 
HashContent::FailOnEmpty despite default value");
 
   {
-    std::string algo_name = context.getProperty(HashAlgorithm) | 
utils::expect("HashContent::HashAlgorithm is required property");
+    std::string algo_name = context.getProperty(HashAlgorithm) | 
utils::orThrow("HashContent::HashAlgorithm is required property");
     std::transform(algo_name.begin(), algo_name.end(), algo_name.begin(), 
::toupper);
     std::erase(algo_name, '-');
     if (!HashAlgos.contains(algo_name)) {
diff --git a/extensions/standard-processors/processors/RouteText.cpp 
b/extensions/standard-processors/processors/RouteText.cpp
index 1688f638d..205ec5566 100644
--- a/extensions/standard-processors/processors/RouteText.cpp
+++ b/extensions/standard-processors/processors/RouteText.cpp
@@ -163,7 +163,7 @@ class RouteText::MatchingContext {
     if (it != regex_values_.end()) {
       return it->second;
     }
-    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::expect("Missing dynamic property");
+    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::orThrow("Missing dynamic property");
     std::vector<utils::Regex::Mode> flags;
     if (case_policy_ == route_text::CasePolicy::IGNORE_CASE) {
       flags.push_back(utils::Regex::Mode::ICASE);
@@ -176,7 +176,7 @@ class RouteText::MatchingContext {
     if (it != string_values_.end()) {
       return it->second;
     }
-    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::expect("Missing dynamic property");
+    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::orThrow("Missing dynamic property");
     return (string_values_[property_name] = value);
   }
 
@@ -185,7 +185,7 @@ class RouteText::MatchingContext {
     if (it != searcher_values_.end()) {
       return it->second.searcher_;
     }
-    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::expect("Missing dynamic property");
+    const auto value = process_context_.getDynamicProperty(property_name, 
flow_file_.get()) | utils::orThrow("Missing dynamic property");
 
     return searcher_values_.emplace(
         std::piecewise_construct, std::forward_as_tuple(property_name),
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 9cf889c13..c1d714445 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -797,7 +797,7 @@ std::string TailFile::baseDirectoryFromAttributes(const 
controllers::AttributePr
   for (const auto& [key, value] : attribute_map) {
     flow_file->setAttribute(key, value);
   }
-  return context.getProperty(BaseDirectory, flow_file.get()) | 
utils::expect("Base directory is required for multiple tail mode.");
+  return context.getProperty(BaseDirectory, flow_file.get()) | 
utils::orThrow("Base directory is required for multiple tail mode.");
 }
 
 std::chrono::milliseconds TailFile::getLookupFrequency() const {
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 608b2afb0..ba340685d 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -131,7 +131,7 @@ void 
RemoteProcessorGroupPort::onSchedule(core::ProcessContext& context, core::P
     }
   }
 
-  idle_timeout_ = context.getProperty(idleTimeout) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::expect("RemoteProcessGroupPort::idleTimeout has default value");
+  idle_timeout_ = context.getProperty(idleTimeout) | 
utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | 
utils::orThrow("RemoteProcessGroupPort::idleTimeout has default value");
 
   std::lock_guard<std::mutex> lock(peer_mutex_);
   if (!nifi_instances_.empty()) {
diff --git a/libminifi/test/libtest/unit/MockClasses.h 
b/libminifi/test/libtest/unit/MockClasses.h
index ff89d1fc6..a38791d38 100644
--- a/libminifi/test/libtest/unit/MockClasses.h
+++ b/libminifi/test/libtest/unit/MockClasses.h
@@ -130,7 +130,7 @@ class MockProcessor : public minifi::core::ProcessorImpl {
       // and verify that we can execute it.
     }
 
-    bool in_sub_process_group = getProperty("InSubProcessGroup") | 
minifi::utils::andThen(minifi::parsing::parseBool) | minifi::utils::expect("");
+    bool in_sub_process_group = getProperty("InSubProcessGroup") | 
minifi::utils::andThen(minifi::parsing::parseBool) | minifi::utils::orThrow("");
     auto sub_service = context.getControllerService("SubMockController", 
getUUID());
     if (in_sub_process_group) {
       REQUIRE(nullptr != sub_service);
diff --git a/libminifi/test/unit/ExpectedTest.cpp 
b/libminifi/test/unit/ExpectedTest.cpp
index 8d6463786..10907c7d7 100644
--- a/libminifi/test/unit/ExpectedTest.cpp
+++ b/libminifi/test/unit/ExpectedTest.cpp
@@ -554,10 +554,10 @@ TEST_CASE("expected toOptional") {
   CHECK(res2 == 5);
 }
 
-TEST_CASE("expected expect") {
+TEST_CASE("expected orThrow") {
   nonstd::expected<int, std::string> unexpected{nonstd::unexpect, "hello"};
   nonstd::expected<int, std::string> expected{5};
 
-  REQUIRE_THROWS_WITH(std::move(unexpected) | utils::expect("should throw"), 
"should throw");
-  CHECK((std::move(expected) | utils::expect("should be 5")) == 5);
+  REQUIRE_THROWS_WITH(std::move(unexpected) | utils::orThrow("should throw"), 
"should throw: hello");
+  CHECK((std::move(expected) | utils::orThrow("should be 5")) == 5);
 }
diff --git a/libminifi/test/unit/OptionalTest.cpp 
b/libminifi/test/unit/OptionalTest.cpp
index d5cff9335..5d28da01c 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -93,3 +93,11 @@ TEST_CASE("optional toExpected") {
   CHECK(expected_from_null_opt_ec.error() == 
std::make_error_code(std::io_errc::stream));
   CHECK(expected_from_null_opt_int.error() == 9);
 }
+
+TEST_CASE("optional orThrow") {
+  std::optional<int> opt_with_value = 5;
+  std::optional<int> opt_without_value = std::nullopt;
+
+  REQUIRE_THROWS_WITH(opt_without_value | utils::orThrow("should throw"), 
"should throw");
+  CHECK((opt_with_value | utils::orThrow("should be 5")) == 5);
+}
diff --git a/utils/include/utils/OptionalUtils.h 
b/utils/include/utils/OptionalUtils.h
index 30451b644..907990496 100644
--- a/utils/include/utils/OptionalUtils.h
+++ b/utils/include/utils/OptionalUtils.h
@@ -15,18 +15,19 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
-#define LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
+#pragma once
 
 #include <functional>
 #include <optional>
 #include <type_traits>
 #include <utility>
+#include <iostream>
 
 #include "nonstd/expected.hpp"
 #include "utils/GeneralUtils.h"
 #include "utils/gsl.h"
 #include "utils/detail/MonadicOperationWrappers.h"
+#include "fmt/format.h"
 
 namespace org::apache::nifi::minifi::utils {
 
@@ -134,8 +135,22 @@ nonstd::expected<T, E> operator|(std::optional<T> object, 
to_expected_wrapper<E>
   }
   return std::move(*object);
 }
-}  // namespace detail
-}  // namespace org::apache::nifi::minifi::utils
 
-#endif  // LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
+template<typename T>
+T&& operator|(std::optional<T> object, const or_throw_wrapper e) {
+  if (object) {
+    return std::move(*object);
+  }
+  throw std::runtime_error(e.reason);
+}
 
+template<typename T>
+T&& operator|(std::optional<T> object, const or_terminate_wrapper e) {
+  if (object) {
+    return std::move(*object);
+  }
+  std::cerr << fmt::format("Aborting due to {}", e.reason) << std::endl;
+  std::abort();
+}
+}  // namespace detail
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/utils/include/utils/detail/MonadicOperationWrappers.h 
b/utils/include/utils/detail/MonadicOperationWrappers.h
index 6d94da403..7c491643b 100644
--- a/utils/include/utils/detail/MonadicOperationWrappers.h
+++ b/utils/include/utils/detail/MonadicOperationWrappers.h
@@ -57,7 +57,11 @@ struct to_expected_wrapper {
   E error;
 };
 
-struct expect_wrapper {
+struct or_throw_wrapper {
+  std::string reason;
+};
+
+struct or_terminate_wrapper {
   std::string reason;
 };
 }  // namespace detail
@@ -117,7 +121,13 @@ inline detail::to_optional_wrapper toOptional() noexcept { 
return {}; }
  * For optional-like types, returns the present value or throws with the 
provided message
  * It is recommended that expect messages are used to describe the reason you 
expect the optional-like to have value.
  */
-inline detail::expect_wrapper expect(std::string&& exception_message) noexcept 
{ return {std::forward<std::string>(std::move(exception_message))}; }
+inline detail::or_throw_wrapper orThrow(std::string&& exception_message) 
noexcept { return {std::forward<std::string>(std::move(exception_message))}; }
+
+/**
+ * For optional-like types, returns the present value or aborts with the 
provided message
+ */
+inline detail::or_terminate_wrapper orTerminate(std::string&& 
exception_message) noexcept { return 
{std::forward<std::string>(std::move(exception_message))}; }
+
 
 /**
  * For optional-like types, only keep the present value if it satisfies the 
predicate
diff --git a/utils/include/utils/expected.h b/utils/include/utils/expected.h
index 0ddd40012..b2d44949d 100644
--- a/utils/include/utils/expected.h
+++ b/utils/include/utils/expected.h
@@ -18,8 +18,11 @@
 #include <type_traits>
 #include <utility>
 #include <optional>
+#include <iostream>
+
 #include "nonstd/expected.hpp"
 #include "utils/detail/MonadicOperationWrappers.h"
+#include "fmt/format.h"
 
 namespace org::apache::nifi::minifi::utils {
 namespace detail {
@@ -191,11 +194,20 @@ std::optional<typename 
std::remove_cvref_t<Expected>::value_type> operator|(Expe
 }
 
 template<expected Expected>
-typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, expect_wrapper e) {
+typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, or_throw_wrapper e) {
+  if (object) {
+    return std::move(*object);
+  }
+  throw std::runtime_error(fmt::format("{}: {}", e.reason, object.error()));
+}
+
+template<expected Expected>
+typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, detail::or_terminate_wrapper e) {
   if (object) {
     return std::move(*object);
   }
-  throw std::runtime_error(e.reason);
+  std::cerr << fmt::format("Aborting due to {}: {}", e.reason, object.error()) 
<< std::endl;
+  std::abort();
 }
 }  // namespace detail
 


Reply via email to