szaszm commented on code in PR #1706:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1706#discussion_r1446307739
##########
extensions/standard-processors/processors/GenerateFlowFile.cpp:
##########
@@ -57,58 +56,78 @@ void generateData(std::vector<char>& data, bool textData =
false) {
}
}
-void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
- if (context.getProperty(FileSize.name, fileSize_)) {
- logger_->log_trace("File size is configured to be {}", fileSize_);
- }
+GenerateFlowFile::Mode GenerateFlowFile::getMode(bool is_unique, bool is_text,
bool has_custom_text, uint64_t file_size) {
+ if (is_text && !is_unique && has_custom_text)
+ return Mode::CustomText;
- if (context.getProperty(BatchSize.name, batchSize_)) {
- logger_->log_trace("Batch size is configured to be {}", batchSize_);
- }
+ if (file_size == 0)
+ return Mode::Empty;
- std::string value;
- if (context.getProperty(DataFormat.name, value)) {
- textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
+ if (is_unique) {
+ if (is_text)
+ return Mode::UniqueText;
+ else
+ return Mode::UniqueByte;
+ } else {
+ if (is_text)
+ return Mode::NotUniqueText;
+ else
+ return Mode::NotUniqueByte;
}
- if (context.getProperty(UniqueFlowFiles.name, uniqueFlowFile_)) {
- logger_->log_trace("Unique Flow files is configured to be {}",
uniqueFlowFile_);
+}
+
+void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ bool is_text = context.getProperty<std::string>(DataFormat)
+ | utils::transform([](const std::string& data_format) { return
data_format == DATA_FORMAT_TEXT;})
+ | utils::valueOrElse([]() {return false;});
+ bool is_unique = context.getProperty<bool>(UniqueFlowFiles) |
utils::valueOrElse([] { return true; });
+
+ auto custom_text_without_evaluation = context.getProperty(CustomText);
+ bool has_custom_text = custom_text_without_evaluation.has_value() &&
!custom_text_without_evaluation->empty();
+
+ context.getProperty(FileSize, file_size_);
+ context.getProperty(BatchSize, batch_size_);
+
+ mode_ = getMode(is_unique, is_text, has_custom_text, file_size_);
+
+ if (!isUnique(mode_)) {
+ non_unique_data_.resize(gsl::narrow<size_t>(file_size_));
+ generateData(non_unique_data_, isText(mode_));
}
+ logger_->log_trace("GenerateFlowFile is configured in {} mode",
magic_enum::enum_name(mode_));
+ if (mode_ != Mode::CustomText && has_custom_text)
+ logger_->log_warn("Custom Text property is set, but not used!");
Review Comment:
It may be worth clarifying why it's not used, or how to get it to be used.
##########
extensions/standard-processors/processors/GenerateFlowFile.cpp:
##########
@@ -57,58 +56,78 @@ void generateData(std::vector<char>& data, bool textData =
false) {
}
}
-void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
- if (context.getProperty(FileSize.name, fileSize_)) {
- logger_->log_trace("File size is configured to be {}", fileSize_);
- }
+GenerateFlowFile::Mode GenerateFlowFile::getMode(bool is_unique, bool is_text,
bool has_custom_text, uint64_t file_size) {
+ if (is_text && !is_unique && has_custom_text)
+ return Mode::CustomText;
- if (context.getProperty(BatchSize.name, batchSize_)) {
- logger_->log_trace("Batch size is configured to be {}", batchSize_);
- }
+ if (file_size == 0)
+ return Mode::Empty;
- std::string value;
- if (context.getProperty(DataFormat.name, value)) {
- textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
+ if (is_unique) {
+ if (is_text)
+ return Mode::UniqueText;
+ else
+ return Mode::UniqueByte;
+ } else {
+ if (is_text)
+ return Mode::NotUniqueText;
+ else
+ return Mode::NotUniqueByte;
}
- if (context.getProperty(UniqueFlowFiles.name, uniqueFlowFile_)) {
- logger_->log_trace("Unique Flow files is configured to be {}",
uniqueFlowFile_);
+}
+
+void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ bool is_text = context.getProperty<std::string>(DataFormat)
+ | utils::transform([](const std::string& data_format) { return
data_format == DATA_FORMAT_TEXT;})
+ | utils::valueOrElse([]() {return false;});
+ bool is_unique = context.getProperty<bool>(UniqueFlowFiles) |
utils::valueOrElse([] { return true; });
+
+ auto custom_text_without_evaluation = context.getProperty(CustomText);
+ bool has_custom_text = custom_text_without_evaluation.has_value() &&
!custom_text_without_evaluation->empty();
+
+ context.getProperty(FileSize, file_size_);
+ context.getProperty(BatchSize, batch_size_);
+
+ mode_ = getMode(is_unique, is_text, has_custom_text, file_size_);
+
+ if (!isUnique(mode_)) {
+ non_unique_data_.resize(gsl::narrow<size_t>(file_size_));
+ generateData(non_unique_data_, isText(mode_));
}
+ logger_->log_trace("GenerateFlowFile is configured in {} mode",
magic_enum::enum_name(mode_));
+ if (mode_ != Mode::CustomText && has_custom_text)
+ logger_->log_warn("Custom Text property is set, but not used!");
+}
+
+// The custom text has to be reevaluated once per batch
+void GenerateFlowFile::refreshNonUniqueData(core::ProcessContext& context) {
+ if (mode_ != Mode::CustomText)
+ return;
std::string custom_text;
context.getProperty(CustomText, custom_text, nullptr);
- if (!custom_text.empty()) {
- if (textData_ && !uniqueFlowFile_) {
- data_.assign(custom_text.begin(), custom_text.end());
- return;
- } else {
- logger_->log_warn("Custom Text property is set, but not used!");
- }
- }
-
- if (!uniqueFlowFile_) {
- data_.resize(gsl::narrow<size_t>(fileSize_));
- generateData(data_, textData_);
- }
+ non_unique_data_.assign(custom_text.begin(), custom_text.end());
}
-void GenerateFlowFile::onTrigger(core::ProcessContext&, core::ProcessSession&
session) {
- for (uint64_t i = 0; i < batchSize_; i++) {
+void GenerateFlowFile::onTrigger(core::ProcessContext& context,
core::ProcessSession& session) {
+ refreshNonUniqueData(context);
+ for (uint64_t i = 0; i < batch_size_; i++) {
// For each batch
- std::shared_ptr<core::FlowFile> flowFile = session.create();
- if (!flowFile) {
+ std::shared_ptr<core::FlowFile> flow_file = session.create();
Review Comment:
The comment on line 115 is confusing. The for cycle is one batch, and each
iteration is one flow file, but it suggests as if each iteration was one batch.
##########
extensions/standard-processors/processors/GenerateFlowFile.cpp:
##########
@@ -57,58 +56,78 @@ void generateData(std::vector<char>& data, bool textData =
false) {
}
}
-void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
- if (context.getProperty(FileSize.name, fileSize_)) {
- logger_->log_trace("File size is configured to be {}", fileSize_);
- }
+GenerateFlowFile::Mode GenerateFlowFile::getMode(bool is_unique, bool is_text,
bool has_custom_text, uint64_t file_size) {
+ if (is_text && !is_unique && has_custom_text)
+ return Mode::CustomText;
- if (context.getProperty(BatchSize.name, batchSize_)) {
- logger_->log_trace("Batch size is configured to be {}", batchSize_);
- }
+ if (file_size == 0)
+ return Mode::Empty;
- std::string value;
- if (context.getProperty(DataFormat.name, value)) {
- textData_ = (value == GenerateFlowFile::DATA_FORMAT_TEXT);
+ if (is_unique) {
+ if (is_text)
+ return Mode::UniqueText;
+ else
+ return Mode::UniqueByte;
+ } else {
+ if (is_text)
+ return Mode::NotUniqueText;
+ else
+ return Mode::NotUniqueByte;
}
- if (context.getProperty(UniqueFlowFiles.name, uniqueFlowFile_)) {
- logger_->log_trace("Unique Flow files is configured to be {}",
uniqueFlowFile_);
+}
+
+void GenerateFlowFile::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+ bool is_text = context.getProperty<std::string>(DataFormat)
+ | utils::transform([](const std::string& data_format) { return
data_format == DATA_FORMAT_TEXT;})
+ | utils::valueOrElse([]() {return false;});
+ bool is_unique = context.getProperty<bool>(UniqueFlowFiles) |
utils::valueOrElse([] { return true; });
+
+ auto custom_text_without_evaluation = context.getProperty(CustomText);
+ bool has_custom_text = custom_text_without_evaluation.has_value() &&
!custom_text_without_evaluation->empty();
+
+ context.getProperty(FileSize, file_size_);
+ context.getProperty(BatchSize, batch_size_);
+
+ mode_ = getMode(is_unique, is_text, has_custom_text, file_size_);
+
+ if (!isUnique(mode_)) {
+ non_unique_data_.resize(gsl::narrow<size_t>(file_size_));
+ generateData(non_unique_data_, isText(mode_));
}
+ logger_->log_trace("GenerateFlowFile is configured in {} mode",
magic_enum::enum_name(mode_));
+ if (mode_ != Mode::CustomText && has_custom_text)
+ logger_->log_warn("Custom Text property is set, but not used!");
+}
+
+// The custom text has to be reevaluated once per batch
+void GenerateFlowFile::refreshNonUniqueData(core::ProcessContext& context) {
+ if (mode_ != Mode::CustomText)
+ return;
std::string custom_text;
context.getProperty(CustomText, custom_text, nullptr);
- if (!custom_text.empty()) {
- if (textData_ && !uniqueFlowFile_) {
- data_.assign(custom_text.begin(), custom_text.end());
- return;
- } else {
- logger_->log_warn("Custom Text property is set, but not used!");
- }
- }
-
- if (!uniqueFlowFile_) {
- data_.resize(gsl::narrow<size_t>(fileSize_));
- generateData(data_, textData_);
- }
+ non_unique_data_.assign(custom_text.begin(), custom_text.end());
}
-void GenerateFlowFile::onTrigger(core::ProcessContext&, core::ProcessSession&
session) {
- for (uint64_t i = 0; i < batchSize_; i++) {
+void GenerateFlowFile::onTrigger(core::ProcessContext& context,
core::ProcessSession& session) {
+ refreshNonUniqueData(context);
+ for (uint64_t i = 0; i < batch_size_; i++) {
// For each batch
- std::shared_ptr<core::FlowFile> flowFile = session.create();
- if (!flowFile) {
+ std::shared_ptr<core::FlowFile> flow_file = session.create();
+ if (!flow_file) {
logger_->log_error("Failed to create flowfile!");
return;
}
- if (uniqueFlowFile_) {
- std::vector<char> data(gsl::narrow<size_t>(fileSize_));
- if (fileSize_ > 0) {
- generateData(data, textData_);
- }
- session.writeBuffer(flowFile, data);
+ if (mode_ == Mode::Empty) {
+ // noop
Review Comment:
Shouldn't we emit an empty flow file here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]