martinzink commented on a change in pull request #1178:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1178#discussion_r719326342
##########
File path: extensions/azure/processors/PutAzureDataLakeStorage.cpp
##########
@@ -72,19 +72,26 @@ void PutAzureDataLakeStorage::initialize() {
}
void PutAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
- connection_string_ = getConnectionStringFromControllerService(context);
- if (connection_string_.empty()) {
+ auto credentials = getCredentialsFromControllerService(context);
+ if (!credentials) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials
Service property missing or invalid");
}
+ if ((!credentials->getUseManagedIdentityCredentials() &&
credentials->buildConnectionString().empty()) ||
+ (credentials->getUseManagedIdentityCredentials() &&
credentials->getStorageAccountName().empty())) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials
Service properties are not set or invalid");
+ }
+
+ credentials_ = *credentials;
Review comment:
This onSchedule also sets members but isnt protected by the mutex
##########
File path: extensions/azure/processors/PutAzureBlobStorage.cpp
##########
@@ -179,53 +232,43 @@ void PutAzureBlobStorage::onTrigger(const
std::shared_ptr<core::ProcessContext>
return;
}
- auto connection_string = getConnectionString(context, flow_file);
- if (connection_string.empty()) {
- logger_->log_error("Connection string is empty!");
- session->transfer(flow_file, Failure);
- return;
- }
-
- std::string container_name;
- if (!context->getProperty(ContainerName, container_name, flow_file) ||
container_name.empty()) {
- logger_->log_error("Container Name is invalid or empty!");
- session->transfer(flow_file, Failure);
- return;
- }
-
- std::string blob_name;
- if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) {
- logger_->log_error("Blob name is invalid or empty!");
+ auto params = buildAzureBlobStorageParameters(context, flow_file);
+ if (!params) {
session->transfer(flow_file, Failure);
return;
}
std::optional<storage::UploadBlobResult> upload_result;
{
// TODO(lordgamez): This can be removed after maximum allowed threads are
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
+ // When used in multithreaded environment make sure to use the
azure_storage_mutex_ to lock the wrapper so the
+ // client is not reset with different configuration while another thread
is using it.
std::lock_guard<std::mutex> lock(azure_storage_mutex_);
- createAzureStorageClient(connection_string, container_name);
if (create_container_) {
Review comment:
onSchedule sets this create_container_ member but onSchedule is not
protected by the azure_storage_mutex_
Could that cause unexpected behaviour in multithreaded enviroment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]