MINIFICPP-434 Added ExecuteSQL(ite) implementation, tests, and docs

This closes #284.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/07edfb16
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/07edfb16
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/07edfb16

Branch: refs/heads/master
Commit: 07edfb162f11bf0cc3b3edcbc8dccdb567f36ada
Parents: e1ff861
Author: Andrew I. Christianson <[email protected]>
Authored: Sun Mar 18 12:34:52 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Tue Mar 20 20:47:32 2018 -0400

----------------------------------------------------------------------
 PROCESSORS.md                               |  38 +++-
 README.md                                   |   1 +
 extensions/sqlite/ExecuteSQL.cpp            | 190 +++++++++++++++++
 extensions/sqlite/ExecuteSQL.h              |  83 ++++++++
 extensions/sqlite/SQLiteConnection.h        |   8 +
 libminifi/test/sqlite-tests/SQLiteTests.cpp | 255 +++++++++++++++++++++++
 6 files changed, 574 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 9c6ef23..8fb870a 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -22,6 +22,7 @@
 - [ConsumeMQTT](#consumemqtt)
 - [ExecuteProcess](#executeprocess)
 - [ExecuteScript](#executescript)
+- [ExecuteSQL](#executesql)
 - [ExtractText](#extracttext)
 - [FocusArchiveEntry](#focusarchiveentry)
 - [GenerateFlowFile](#generateflowfile)
@@ -248,6 +249,42 @@ default values, and whether a property supports the NiFi 
Expression Language.
 | success | Script successes |
 | failure | Script failures |
 
+## ExecuteSQL
+
+### Description
+
+Execute provided SQL query. Query result rows will be outputted as new flow
+files with attribute keys equal to result column names and values equal to
+result values. There will be one output FlowFile per result row. This processor
+can be scheduled to run using the standard timer-based scheduling methods, or
+it can be triggered by an incoming FlowFile. If it is triggered by an incoming
+FlowFile, then attributes of that FlowFile will be available when evaluating
+the query.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| **Connection URL** | | | The database connection URL (e.g. 
`sqlite://filename.db?cache=shared`) **Only SQLite is currently supported** |
+| SQL Statement | | | The SQL query to execute. The query can be empty, a 
constant value, or built from attributes using Expression Language. If this 
property is specified, it will be used regardless of the content of incoming 
flowfiles. If this property is empty, the content of the incoming flow file is 
expected to contain a valid SQL query, to be issued by the processor to the 
database. Note that Expression Language is not evaluated for flow file 
contents.<br>**Supports Expression Language: true** |
+
+### Relationships
+
+| Name | Description |
+| - | - |
+| original | Upon successful query execution, the original FlowFile is routed 
here. |
+| success | For each SQL result row, a FlowFile will be written to this 
relationships. Attributes will be written to result FlowFiles having the same 
names and values as present in result columns. |
+| failure | A FlowFile is routed to this relationship if the SQL statement 
cannot be executed and retrying the operation will also fail, such as an 
invalid query or an integrity constraint violation. |
+
+### Reads Attributes
+| Name | Description |
+| - | - |
+| sql.args.N.value | Incoming FlowFiles are expected to be parametrized SQL 
statements. The value of the Parameters are specified as `sql.args.1.value`, 
`sql.args.2.value`, `sql.args.3.value`, and so on. The type of the 
`sql.args.1.value` Parameter is specified by the `sql.args.1.type` attribute. |
+
 ## GetFile
 
 ### Description
@@ -545,7 +582,6 @@ default values, and whether a property supports the NiFi 
Expression Language.
 | - | - |
 | sql.args.N.value | Incoming FlowFiles are expected to be parametrized SQL 
statements. The value of the Parameters are specified as `sql.args.1.value`, 
`sql.args.2.value`, `sql.args.3.value`, and so on. The type of the 
`sql.args.1.value` Parameter is specified by the `sql.args.1.type` attribute. |
 
-
 ## RouteOnAttribute
 
 ### Description

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 46efc37..c6a550f 100644
--- a/README.md
+++ b/README.md
@@ -55,6 +55,7 @@ MiNiFi - C++ supports the following processors:
 * [ConsumeMQTT](PROCESSORS.md#consumeMQTT)
 * [ExecuteProcess](PROCESSORS.md#executeprocess)
 * [ExecuteScript](PROCESSORS.md#executescript)
+- [ExecuteSQL](PROCESSORS.md#executesql)
 * [ExtractText](PROCESSORS.md#extracttext)
 * [FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)
 * [GenerateFlowFile](PROCESSORS.md#generateflowfile)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/extensions/sqlite/ExecuteSQL.cpp
----------------------------------------------------------------------
diff --git a/extensions/sqlite/ExecuteSQL.cpp b/extensions/sqlite/ExecuteSQL.cpp
new file mode 100644
index 0000000..a151bdc
--- /dev/null
+++ b/extensions/sqlite/ExecuteSQL.cpp
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecuteSQL.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ExecuteSQL::ConnectionURL(  // NOLINT
+    "Connection URL",
+    "The database URL to connect to",
+    "");
+core::Property ExecuteSQL::SQLStatement(  // NOLINT
+    "SQL Statement",
+    "The SQL statement to execute",
+    "");
+
+core::Relationship ExecuteSQL::Success(  // NOLINT
+    "success",
+    "After a successful SQL execution, result FlowFiles are sent here");
+core::Relationship ExecuteSQL::Original(  // NOLINT
+    "original",
+    "The original FlowFile is sent here");
+core::Relationship ExecuteSQL::Failure(  // NOLINT
+    "failure",
+    "Failures which will not work if retried");
+
+void ExecuteSQL::initialize() {
+  std::set<core::Property> properties;
+  properties.insert(ConnectionURL);
+  properties.insert(SQLStatement);
+  setSupportedProperties(std::move(properties));
+
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Original);
+  relationships.insert(Failure);
+  setSupportedRelationships(std::move(relationships));
+}
+
+void ExecuteSQL::onSchedule(core::ProcessContext *context,
+                            core::ProcessSessionFactory *sessionFactory) {
+  context->getProperty(ConnectionURL.getName(), db_url_);
+
+  if (db_url_.empty()) {
+    logger_->log_error("Invalid Connection URL");
+  }
+
+  context->getProperty(SQLStatement.getName(), sql_);
+}
+
+void ExecuteSQL::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session) {
+  std::shared_ptr<FlowFileRecord> flow_file = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+
+  try {
+    // Use an existing context, if one is available
+    std::shared_ptr<minifi::sqlite::SQLiteConnection> db;
+
+    if (conn_q_.try_dequeue(db)) {
+      logger_->log_debug("Using available SQLite connection");
+    }
+
+    if (!db) {
+      logger_->log_info("Creating new SQLite connection");
+      if (db_url_.substr(0, 9) == "sqlite://") {
+        db = 
std::make_shared<minifi::sqlite::SQLiteConnection>(db_url_.substr(9));
+      } else {
+        std::stringstream err_msg;
+        err_msg << "Connection URL '" << db_url_ << "' is unsupported";
+        logger_->log_error(err_msg.str().c_str());
+        throw std::runtime_error("Connection Error");
+      }
+    }
+
+    auto dynamic_sql = std::make_shared<std::string>();
+
+    if (flow_file) {
+      if (sql_.empty()) {
+        // SQL is not defined as a property, so get SQL from the file content
+        SQLReadCallback cb(dynamic_sql);
+        session->read(flow_file, &cb);
+      } else {
+        // SQL is defined as a property, so get the property dynamically w/ EL 
support
+        context->getProperty(SQLStatement.getName(), *dynamic_sql, flow_file);
+      }
+    }
+
+    auto stmt = flow_file
+                ? db->prepare(*dynamic_sql)
+                : db->prepare(sql_);
+
+    if (flow_file) {
+      for (int i = 1; i < INT_MAX; i++) {
+        std::string val;
+        std::stringstream val_key;
+        val_key << "sql.args." << i << ".value";
+
+        if (!flow_file->getAttribute(val_key.str(), val)) {
+          break;
+        }
+
+        stmt.bind_text(i, val);
+      }
+    }
+
+    stmt.step();
+
+    if (!stmt.is_ok()) {
+      logger_->log_error("SQL statement execution failed: %s", db->errormsg());
+      session->transfer(flow_file, Failure);
+    }
+
+    auto num_cols = stmt.column_count();
+    std::vector<std::string> col_names;
+
+    for (int i = 0; i < num_cols; i++) {
+      col_names.emplace_back(stmt.column_name(i));
+    }
+
+    while (stmt.is_ok() && !stmt.is_done()) {
+      auto result_ff = session->create();
+
+      for (int i = 0; i < num_cols; i++) {
+        result_ff->addAttribute(col_names[i], stmt.column_text(i));
+      }
+
+      session->transfer(result_ff, Success);
+      stmt.step();
+    }
+
+    if (flow_file) {
+      session->transfer(flow_file, Original);
+    }
+
+    // Make connection available for use again
+    if (conn_q_.size_approx() < getMaxConcurrentTasks()) {
+      logger_->log_debug("Releasing SQLite connection");
+      conn_q_.enqueue(db);
+    } else {
+      logger_->log_info("Destroying SQLite connection because it is no longer 
needed");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_error("Caught Exception %s", exception.what());
+    if (flow_file) {
+      session->transfer(flow_file, Failure);
+    }
+    this->yield();
+  } catch (...) {
+    logger_->log_error("Caught Exception");
+    if (flow_file) {
+      session->transfer(flow_file, Failure);
+    }
+    this->yield();
+  }
+}
+
+int64_t ExecuteSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
+  sql_->resize(stream->getSize());
+  auto num_read = static_cast<uint64_t 
>(stream->readData(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
+                                                          
static_cast<int>(stream->getSize())));
+
+  if (num_read != stream->getSize()) {
+    throw std::runtime_error("SQLReadCallback failed to fully read flow file 
input stream");
+  }
+
+  return num_read;
+}
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/extensions/sqlite/ExecuteSQL.h
----------------------------------------------------------------------
diff --git a/extensions/sqlite/ExecuteSQL.h b/extensions/sqlite/ExecuteSQL.h
new file mode 100644
index 0000000..46cff1d
--- /dev/null
+++ b/extensions/sqlite/ExecuteSQL.h
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_EXECUTESQL_H
+#define NIFI_MINIFI_CPP_EXECUTESQL_H
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+
+#include <concurrentqueue.h>
+
+#include "SQLiteConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ExecuteSQL : public core::Processor {
+ public:
+  explicit ExecuteSQL(const std::string &name, uuid_t uuid = nullptr)
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<ExecuteSQL>::getLogger()) {
+  }
+
+  static core::Property ConnectionURL;
+  static core::Property SQLStatement;
+
+  static core::Relationship Success;
+  static core::Relationship Original;
+  static core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override {
+    logger_->log_error("onTrigger invocation with raw pointers is not 
implemented");
+  }
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                 const std::shared_ptr<core::ProcessSession> &session) 
override;
+
+  class SQLReadCallback : public InputStreamCallback {
+   public:
+    explicit SQLReadCallback(std::shared_ptr<std::string> sql)
+        : sql_(std::move(sql)) {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    std::shared_ptr<std::string> sql_;
+  };
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  
moodycamel::ConcurrentQueue<std::shared_ptr<minifi::sqlite::SQLiteConnection>> 
conn_q_;
+
+  std::string db_url_;
+  std::string sql_;
+};
+
+REGISTER_RESOURCE(ExecuteSQL); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_EXECUTESQL_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/extensions/sqlite/SQLiteConnection.h
----------------------------------------------------------------------
diff --git a/extensions/sqlite/SQLiteConnection.h 
b/extensions/sqlite/SQLiteConnection.h
index ed1f10c..0b6e325 100644
--- a/extensions/sqlite/SQLiteConnection.h
+++ b/extensions/sqlite/SQLiteConnection.h
@@ -183,6 +183,14 @@ class SQLiteStatement {
     return SQLITE_NULL == sqlite3_column_type(stmt_, col);
   }
 
+  std::string column_name(int col) {
+    return std::string(sqlite3_column_name(stmt_, col));
+  }
+
+  int column_count() {
+    return sqlite3_column_count(stmt_);
+  }
+
   void reset() {
     sqlite3_reset(stmt_);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/07edfb16/libminifi/test/sqlite-tests/SQLiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/sqlite-tests/SQLiteTests.cpp 
b/libminifi/test/sqlite-tests/SQLiteTests.cpp
index 6d83351..5c7ea06 100644
--- a/libminifi/test/sqlite-tests/SQLiteTests.cpp
+++ b/libminifi/test/sqlite-tests/SQLiteTests.cpp
@@ -25,6 +25,7 @@
 #include <GenerateFlowFile.h>
 #include <UpdateAttribute.h>
 #include <LogAttribute.h>
+#include <ExecuteSQL.h>
 
 #include "../TestBase.h"
 
@@ -216,3 +217,257 @@ TEST_CASE("Test Put Content", "[PutSQLPutContent]") {  // 
NOLINT
     REQUIRE("fdsa" == stmt.column_text(1));
   }
 }
+
+TEST_CASE("Test Exec", "[ExecuteSQL]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test db
+  std::string test_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_dir[0]) != nullptr);
+
+  // Define test db file
+  std::string test_db(test_dir);
+  test_db.append("/test.db");
+
+  // Create test db
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col 
TEXT);");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+
+    // Insert test data
+    auto stmt2 = db.prepare("INSERT INTO test_table (int_col, text_col) VALUES 
(42, 'asdf');");
+    stmt2.step();
+    REQUIRE(stmt2.is_ok());
+  }
+
+  // Build MiNiFi processing graph
+  auto exec = plan->addProcessor(
+      "ExecuteSQL",
+      "ExecuteSQL");
+  plan->setProperty(
+      exec,
+      "Connection URL",
+      "sqlite://" + test_db);
+  plan->setProperty(
+      exec,
+      "SQL Statement",
+      "SELECT * FROM test_table;");
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+
+  plan->runNextProcessor();  // Exec
+  plan->runNextProcessor();  // Log
+
+  // Verify output state
+  REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
+  REQUIRE(LogTestController::getInstance().contains("key:text_col 
value:asdf"));
+}
+
+TEST_CASE("Test Exec 2", "[ExecuteSQL2]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+  LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
+  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test db
+  std::string test_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_dir[0]) != nullptr);
+
+  // Define test db file
+  std::string test_db(test_dir);
+  test_db.append("/test.db");
+
+  // Create test db
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col 
INTEGER, text_col TEXT);");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+
+    // Insert test data
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (1, 33, 'aaaa');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (2, 42, 'bbbb');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (3, 24, 'cccc');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+  }
+
+  // Build MiNiFi processing graph
+  auto generate = plan->addProcessor(
+      "GenerateFlowFile",
+      "Generate");
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "Update",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.1.value",
+      "2",
+      true);
+  auto exec = plan->addProcessor(
+      "ExecuteSQL",
+      "ExecuteSQL",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      exec,
+      "Connection URL",
+      "sqlite://" + test_db);
+  plan->setProperty(
+      exec,
+      "SQL Statement",
+      "SELECT * FROM test_table WHERE id_col = ?;");
+  std::set<core::Relationship> auto_term_rels;
+  core::Relationship original("original", "");
+  auto_term_rels.insert(original);
+  exec->setAutoTerminatedRelationships(auto_term_rels);
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+
+  plan->runNextProcessor();  // Gen
+  plan->runNextProcessor();  // Update
+  plan->runNextProcessor();  // Exec
+  plan->runNextProcessor();  // Log
+
+  // Verify output state
+  REQUIRE(LogTestController::getInstance().contains("key:id_col value:2"));
+  REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
+  REQUIRE(LogTestController::getInstance().contains("key:text_col 
value:bbbb"));
+}
+
+TEST_CASE("Test Exec 3", "[ExecuteSQL3]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::GetFile>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test db
+  std::string test_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_dir[0]) != nullptr);
+
+  // Define test db file
+  std::string test_db(test_dir);
+  test_db.append("/test.db");
+
+  // Define directory for test input file
+  std::string test_in_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_in_dir[0]) != nullptr);
+
+  // Define test input file
+  std::string test_file(test_in_dir);
+  test_file.append("/test.in");
+
+  // Write test SQL content
+  {
+    std::ofstream os(test_file);
+    os << "SELECT text_col FROM test_table WHERE id_col = ?;";
+  }
+
+  // Create test db
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col 
INTEGER, text_col TEXT);");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+
+    // Insert test data
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (1, 33, 'aaaa');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (2, 42, 'bbbb');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+    {
+      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, 
text_col) VALUES (3, 24, 'cccc');");
+      ins.step();
+      REQUIRE(ins.is_ok());
+    }
+  }
+
+  // Build MiNiFi processing graph
+  auto get_file = plan->addProcessor(
+      "GetFile",
+      "Get");
+  plan->setProperty(
+      get_file,
+      processors::GetFile::Directory.getName(), test_in_dir);
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "Update",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.1.value",
+      "2",
+      true);
+  auto exec = plan->addProcessor(
+      "ExecuteSQL",
+      "ExecuteSQL",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      exec,
+      "Connection URL",
+      "sqlite://" + test_db);
+  std::set<core::Relationship> auto_term_rels;
+  core::Relationship original("original", "");
+  auto_term_rels.insert(original);
+  exec->setAutoTerminatedRelationships(auto_term_rels);
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+
+  plan->runNextProcessor();  // Get
+  plan->runNextProcessor();  // Update
+  plan->runNextProcessor();  // Exec
+  plan->runNextProcessor();  // Log
+
+  // Verify output state
+  REQUIRE(LogTestController::getInstance().contains("key:text_col 
value:bbbb"));
+}

Reply via email to