adamdebreceni commented on a change in pull request #1004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r600240781



##########
File path: extensions/sql/data/DatabaseConnectors.h
##########
@@ -45,8 +45,12 @@ class Statement {
 
   virtual ~Statement() = default;
 
-  soci::rowset<soci::row> execute() {
-    return session_.prepare << query_;
+  soci::rowset<soci::row> execute(const std::vector<std::string>& args = {}) {
+    auto stmt = session_.prepare << query_;
+    for (auto& arg : args) {
+      stmt, soci::use(arg);

Review comment:
       it seems to me it is, it binds the arguments of a prepared statement 
like `"SELECT * FROM table WHERE id = ?".bind(4)` but the interface is 
unfortunate, added explicit comma operator call and a comment explaining what 
is happening

##########
File path: extensions/sql/data/MaxCollector.h
##########
@@ -31,26 +33,21 @@ namespace minifi {
 namespace sql {
 
 class MaxCollector: public SQLRowSubscriber {
-  void beginProcessRow() override {}
-
-  void endProcessRow() override {
-    if (columnsVerified_) {
-      return;
+  void beginProcessBatch() override {}
+  void endProcessBatch(Progress progress) override {
+    if (progress == Progress::DONE) {
+      updateMapState();
     }
-
-    if (countColumns_ != mapState_.size())
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" 
+ maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + 
"' result.");
-
-    columnsVerified_ = true;
   }
+  void beginProcessRow() override {}
+  void endProcessRow() override {}
 
-  void processColumnName(const std::string& name) override {
-    if (columnsVerified_) {
-      return;
-    }
-
-    if (mapState_.count(name)) {
-      countColumns_++;
+  void processColumnNames(const std::vector<std::string>& names) override {
+    for (auto& expected : state_) {

Review comment:
       done

##########
File path: extensions/sql/data/SQLRowsetProcessor.cpp
##########
@@ -20,51 +20,61 @@
 
 #include "Exception.h"
 #include "Utils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace sql {
 
-SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, 
const std::vector<SQLRowSubscriber*>& rowSubscribers)
-  : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, 
std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
+  : rowset_(rowset), row_subscribers_(std::move(row_subscribers)) {
   iter_ = rowset_.begin();
 }
 
 size_t SQLRowsetProcessor::process(size_t max) {
   size_t count = 0;
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessBatch();
+  }
+
   for (; iter_ != rowset_.end(); ) {
     addRow(*iter_, count);
     iter_++;
     count++;
-    totalCount_++;
     if (max > 0 && count >= max) {
       break;
     }
   }
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().endProcessBatch(count == 0 ? 
SQLRowSubscriber::Progress::DONE : SQLRowSubscriber::Progress::CONTINUE);
+  }
+
   return count;
 }
 
 void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
-  for (const auto& pRowSubscriber : rowSubscribers_) {
-    pRowSubscriber->beginProcessRow();
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessRow();
   }
 
   if (rowCount == 0) {
+    std::vector<std::string> column_names;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to