martinzink commented on code in PR #2153:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2153#discussion_r3413158000
##########
extensions/gcp/processors/FetchGCSObject.cpp:
##########
@@ -82,80 +80,78 @@ class FetchFromGCSCallback {
};
} // namespace
-
-void FetchGCSObject::initialize() {
- setSupportedProperties(Properties);
- setSupportedRelationships(Relationships);
-}
-
-void FetchGCSObject::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory& session_factory) {
- GCSProcessor::onSchedule(context, session_factory);
- if (auto encryption_key = context.getProperty(EncryptionKey)) {
+MinifiStatus FetchGCSObject::onScheduleImpl(api::core::ProcessContext&
context) {
+ const auto status = GCSProcessor::onScheduleImpl(context);
+ if (MINIFI_STATUS_SUCCESS != status) {
+ return status;
+ }
+ if (auto encryption_key = context.getProperty(EncryptionKey, nullptr)) {
try {
encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
} catch (const google::cloud::RuntimeStatusError&) {
- throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION,
"Could not decode the base64-encoded encryption key from property " +
std::string(EncryptionKey.name)); }
+ logger_->log_error("Could not decode the base64-encoded encryption key
from property {}", std::string(EncryptionKey.name));
+ return MINIFI_STATUS_UNKNOWN_ERROR;
+ }
}
+ return MINIFI_STATUS_SUCCESS;
}
-void FetchGCSObject::onTrigger(core::ProcessContext& context,
core::ProcessSession& session) {
+MinifiStatus FetchGCSObject::onTriggerImpl(api::core::ProcessContext& context,
api::core::ProcessSession& session) {
gsl_Expects(gcp_credentials_);
auto flow_file = session.get();
if (!flow_file) {
- context.yield();
- return;
+ return MINIFI_STATUS_PROCESSOR_YIELD;
}
- auto bucket = context.getProperty(Bucket, flow_file.get());
+ auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file);
if (!bucket || bucket->empty()) {
logger_->log_error("Missing bucket name");
- session.transfer(flow_file, Failure);
- return;
+ session.transfer(std::move(flow_file), Failure);
+ return MINIFI_STATUS_SUCCESS;
}
- auto object_name = context.getProperty(Key, flow_file.get());
+ auto object_name = api::utils::parseOptionalProperty(context, Key,
&flow_file);
if (!object_name || object_name->empty()) {
logger_->log_error("Missing object name");
- session.transfer(flow_file, Failure);
- return;
+ session.transfer(std::move(flow_file), Failure);
+ return MINIFI_STATUS_SUCCESS;
}
gcs::Client client = getClient();
FetchFromGCSCallback callback(client, *bucket, *object_name);
callback.setEncryptionKey(encryption_key_);
- if (const auto object_generation_str =
context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str
&& !object_generation_str->empty()) {
+ if (const auto object_generation_str =
api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file);
object_generation_str && !object_generation_str->empty()) {
if (const auto geni64 =
parsing::parseIntegral<int64_t>(*object_generation_str)) {
gcs::Generation generation = gcs::Generation{*geni64};
callback.setGeneration(generation);
} else {
logger_->log_error("Invalid generation: {}", *object_generation_str);
- session.transfer(flow_file, Failure);
- return;
+ session.transfer(std::move(flow_file), Failure);
+ return MINIFI_STATUS_SUCCESS;
}
}
session.write(flow_file, std::ref(callback));
if (!callback.getStatus().ok()) {
- flow_file->setAttribute(GCS_STATUS_MESSAGE,
callback.getStatus().message());
- flow_file->setAttribute(GCS_ERROR_REASON,
callback.getStatus().error_info().reason());
- flow_file->setAttribute(GCS_ERROR_DOMAIN,
callback.getStatus().error_info().domain());
+ session.setAttribute(flow_file, GCS_STATUS_MESSAGE,
callback.getStatus().message());
+ session.setAttribute(flow_file, GCS_ERROR_REASON,
callback.getStatus().error_info().reason());
+ session.setAttribute(flow_file, GCS_ERROR_DOMAIN,
callback.getStatus().error_info().domain());
logger_->log_error("Failed to fetch from Google Cloud Storage {} {}",
callback.getStatus().message(), callback.getStatus().error_info().reason());
- session.transfer(flow_file, Failure);
- return;
+ session.transfer(std::move(flow_file), Failure);
+ return MINIFI_STATUS_SUCCESS;
}
if (auto generation = callback.getGeneration())
- flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation));
+ session.setAttribute(flow_file, GCS_GENERATION,
std::to_string(*generation));
if (auto meta_generation = callback.getMetaGeneration())
- flow_file->setAttribute(GCS_META_GENERATION,
std::to_string(*meta_generation));
+ session.setAttribute(flow_file, GCS_META_GENERATION,
std::to_string(*meta_generation));
if (auto storage_class = callback.getStorageClass())
- flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class);
- session.transfer(flow_file, Success);
+ session.setAttribute(flow_file, GCS_STORAGE_CLASS, *storage_class);
Review Comment:
👍
https://github.com/apache/nifi-minifi-cpp/commit/ef5127edb96e2fb6b2976d6e9a765a6edfe4dce4#diff-83c70f4cfe2b0c8cdef254a079398bb7f6b386c9a5dd55367239ab33d257f991R144-R152
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]