adam-markovics commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r599662804
##########
File path: extensions/sql/data/DatabaseConnectors.h
##########
@@ -45,8 +45,12 @@ class Statement {
virtual ~Statement() = default;
- soci::rowset<soci::row> execute() {
- return session_.prepare << query_;
+ soci::rowset<soci::row> execute(const std::vector<std::string>& args = {}) {
+ auto stmt = session_.prepare << query_;
+ for (auto& arg : args) {
+ stmt, soci::use(arg);
Review comment:
Is this comma operator necessary here?
##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -31,26 +33,21 @@ namespace minifi {
namespace sql {
class MaxCollector: public SQLRowSubscriber {
- void beginProcessRow() override {}
-
- void endProcessRow() override {
- if (columnsVerified_) {
- return;
+ void beginProcessBatch() override {}
+ void endProcessBatch(Progress progress) override {
+ if (progress == Progress::DONE) {
+ updateMapState();
}
-
- if (countColumns_ != mapState_.size())
- throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '"
+ maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ +
"' result.");
-
- columnsVerified_ = true;
}
+ void beginProcessRow() override {}
+ void endProcessRow() override {}
- void processColumnName(const std::string& name) override {
- if (columnsVerified_) {
- return;
- }
-
- if (mapState_.count(name)) {
- countColumns_++;
+ void processColumnNames(const std::vector<std::string>& names) override {
+ for (auto& expected : state_) {
Review comment:
I suggest const auto &
##########
File path: extensions/sql/data/SQLRowsetProcessor.cpp
##########
@@ -20,51 +20,61 @@
#include "Exception.h"
#include "Utils.h"
+#include "utils/StringUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace sql {
-SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset,
const std::vector<SQLRowSubscriber*>& rowSubscribers)
- : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset,
std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
+ : rowset_(rowset), row_subscribers_(std::move(row_subscribers)) {
iter_ = rowset_.begin();
}
size_t SQLRowsetProcessor::process(size_t max) {
size_t count = 0;
+ for (const auto& subscriber : row_subscribers_) {
+ subscriber.get().beginProcessBatch();
+ }
+
for (; iter_ != rowset_.end(); ) {
addRow(*iter_, count);
iter_++;
count++;
- totalCount_++;
if (max > 0 && count >= max) {
break;
}
}
+ for (const auto& subscriber : row_subscribers_) {
+ subscriber.get().endProcessBatch(count == 0 ?
SQLRowSubscriber::Progress::DONE : SQLRowSubscriber::Progress::CONTINUE);
+ }
+
return count;
}
void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
- for (const auto& pRowSubscriber : rowSubscribers_) {
- pRowSubscriber->beginProcessRow();
+ for (const auto& subscriber : row_subscribers_) {
+ subscriber.get().beginProcessRow();
}
if (rowCount == 0) {
+ std::vector<std::string> column_names;
Review comment:
You can add a reserve() here since you know the number of elements to be
pushed. But it's not necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]