MINIFICPP-433 Added PutSQL(ite) implementation, tests, docs, and license

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e1ff861a
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e1ff861a
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e1ff861a

Branch: refs/heads/master
Commit: e1ff861aaac12c345c901e0496b7386de9123375
Parents: 8a66f75
Author: Andrew I. Christianson <[email protected]>
Authored: Sat Mar 17 15:15:17 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Tue Mar 20 20:42:45 2018 -0400

----------------------------------------------------------------------
 CMakeLists.txt                              |      5 +
 LICENSE                                     |     26 +
 PROCESSORS.md                               |     42 +-
 README.md                                   |      1 +
 extensions/sqlite/CMakeLists.txt            |     60 +
 extensions/sqlite/PutSQL.cpp                |    190 +
 extensions/sqlite/PutSQL.h                  |     85 +
 extensions/sqlite/SQLiteConnection.h        |    262 +
 libminifi/test/sqlite-tests/CMakeLists.txt  |     40 +
 libminifi/test/sqlite-tests/SQLiteTests.cpp |    218 +
 thirdparty/sqlite/CMakeLists.txt            |     22 +
 thirdparty/sqlite/shell.c                   |  15353 ++
 thirdparty/sqlite/sqlite3.c                 | 207610 ++++++++++++++++++++
 thirdparty/sqlite/sqlite3.h                 |  10827 +
 thirdparty/sqlite/sqlite3ext.h              |    585 +
 15 files changed, 235324 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 166fd16..680b318 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -186,6 +186,11 @@ if (NOT DISABLE_SCRIPTING)
     createExtension(SCRIPTING-EXTENSIONS "SCRIPTING EXTENSIONS" "This enables 
scripting" "extensions/script" "${TEST_DIR}/script-tests")
 endif()
 
+## SQLite extensions
+option(ENABLE_SQLITE "Disables the scripting extensions." OFF)
+if (ENABLE_ALL OR ENABLE_SQLITE)
+    createExtension(SQLITE-EXTENSIONS "SQLITE EXTENSIONS" "This enables 
sqlite" "extensions/sqlite" "${TEST_DIR}/sqlite-tests" "TRUE" 
"thirdparty/sqlite")
+endif()
 
 ## USB camera extensions
 option(ENABLE_USB_CAMERA "Enables USB camera support." OFF)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index cedd336..fbfa59c 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1076,6 +1076,32 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 
OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 THE SOFTWARE.
 
+This product bundles 'sqlite' which is in the public domain
+
+All of the code and documentation in SQLite has been dedicated to the public
+domain by the authors. All code authors, and representatives of the companies
+they work for, have signed affidavits dedicating their contributions to the
+public domain and originals of those signed affidavits are stored in a firesafe
+at the main offices of Hwaci. Anyone is free to copy, modify, publish, use,
+compile, sell, or distribute the original SQLite code, either in source code
+form or as a compiled binary, for any purpose, commercial or non-commercial,
+and by any means.
+
+The previous paragraph applies to the deliverable code and documentation in
+SQLite - those parts of the SQLite library that you actually bundle and ship
+with a larger application. Some scripts used as part of the build process (for
+example the "configure" scripts generated by autoconf) might fall under other
+open-source licenses. Nothing from these build scripts ever reaches the final
+deliverable SQLite library, however, and so the licenses associated with those
+scripts should not be a factor in assessing your rights to copy and use the
+SQLite library.
+
+All of the deliverable code in SQLite has been written from scratch. No code
+has been taken from other projects or from the open internet. Every line of
+code can be traced back to its original author, and all of those authors have
+public domain dedications on file. So the SQLite code base is clean and is
+uncontaminated with licensed code from other projects.
+
 This product bundles RapidJSON:
 Tencent is pleased to support the open source community by making RapidJSON 
available. 
  

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 4ef4b99..9c6ef23 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -19,7 +19,7 @@
 
 - [AppendHostInfo](#appendhostinfo)
 - [CompressContent](#compresscontent)
-* [ConsumeMQTT](#consumeMQTT)
+- [ConsumeMQTT](#consumemqtt)
 - [ExecuteProcess](#executeprocess)
 - [ExecuteScript](#executescript)
 - [ExtractText](#extracttext)
@@ -34,8 +34,9 @@
 - [ManipulateArchive](#manipulatearchive)
 - [MergeContent](#mergecontent)
 - [PublishKafka](#publishkafka)
-* [PublishMQTT](PROCESSORS.md#publishMQTT)
+- [PublishMQTT](#publishmqtt)
 - [PutFile](#putfile)
+- [PutSQL](#putsql)
 - [RouteOnAttribute](#routeonattribute)
 - [TailFile](#tailfile)
 - [TFApplyGraph](#tfapplygraph)
@@ -508,6 +509,43 @@ default values, and whether a property supports the NiFi 
Expression Language.
 | success | Files that have been successfully written to the output directory 
are transferred to this relationship |
 | failure | Files that could not be written to the output directory for some 
reason are transferred to this relationship |
 
+## PutSQL
+
+### Description
+
+Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is
+expected to be the SQL command to execute. The SQL command may use the `?`
+character to bind parameters. In this case, the parameters to use must exist as
+FlowFile attributes with the naming convention `sql.args.N.type` and
+`sql.args.N.value`, where `N` is a positive integer. The content of the
+FlowFile is expected to be in UTF-8 format.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional. The table also indicates any
+default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+| **Connection URL** | | | The database connection URL (e.g. 
`sqlite://filename.db?cache=shared`) **Only SQLite is currently supported** |
+| SQL Statement | | | The SQL statement to execute. The statement can be 
empty, a constant value, or built from attributes using Expression Language. If 
this property is specified, it will be used regardless of the content of 
incoming flowfiles. If this property is empty, the content of the incoming flow 
file is expected to contain a valid SQL statement, to be issued by the 
processor to the database.<br>**Supports Expression Language: true** |
+| **Batch Size** | 1 | | The maximum number of FlowFiles to put to the 
database in a single transaction |
+
+### Relationships
+
+| Name | Description |
+| - | - |
+| retry | A FlowFile is routed to this relationship if the database cannot be 
updated but attempting the operation again may succeed |
+| success | A FlowFile is routed to this relationship after the database is 
successfully updated |
+| failure | A FlowFile is routed to this relationship if the database cannot 
be updated and retrying the operation will also fail, such as an invalid query 
or an integrity constraint violation |
+
+### Reads Attributes
+| Name | Description |
+| - | - |
+| sql.args.N.value | Incoming FlowFiles are expected to be parametrized SQL 
statements. The value of the Parameters are specified as `sql.args.1.value`, 
`sql.args.2.value`, `sql.args.3.value`, and so on. The type of the 
`sql.args.1.value` Parameter is specified by the `sql.args.1.type` attribute. |
+
+
 ## RouteOnAttribute
 
 ### Description

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 0c30eb0..46efc37 100644
--- a/README.md
+++ b/README.md
@@ -69,6 +69,7 @@ MiNiFi - C++ supports the following processors:
 * [PublishKafka](PROCESSORS.md#publishkafka)
 * [PublishMQTT](PROCESSORS.md#publishMQTT)
 * [PutFile](PROCESSORS.md#putfile)
+* [PutSQL](PROCESSORS.md#putsql)
 * [RouteOnAttribute](PROCESSORS.md#routeonattribute)
 * [TailFile](PROCESSORS.md#tailfile)
 * [TFApplyGraph](PROCESSORS.md#tfapplygraph)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/extensions/sqlite/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/extensions/sqlite/CMakeLists.txt b/extensions/sqlite/CMakeLists.txt
new file mode 100644
index 0000000..0ebb636
--- /dev/null
+++ b/extensions/sqlite/CMakeLists.txt
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
+set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
+
+include_directories(../../libminifi/include  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/) 
+
+include_directories("${CMAKE_SOURCE_DIR}/thirdparty/sqlite")
+
+file(GLOB SOURCES "*.cpp")
+
+add_library(minifi-sqlite-extensions STATIC ${SOURCES})
+set_property(TARGET minifi-sqlite-extensions PROPERTY 
POSITION_INDEPENDENT_CODE ON)
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minifi-sqlite-extensions "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minifi-sqlite-extensions "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+target_link_libraries(minifi-sqlite-extensions ${CMAKE_DL_LIBS})
+target_link_libraries(minifi-sqlite-extensions sqlite)
+
+if (WIN32)
+    set_target_properties(minifi-sqlite-extensions PROPERTIES
+        LINK_FLAGS "/WHOLEARCHIVE"
+    )
+elseif (APPLE)
+    set_target_properties(minifi-sqlite-extensions PROPERTIES
+        LINK_FLAGS "-Wl,-all_load"
+    )
+else ()
+    set_target_properties(minifi-sqlite-extensions PROPERTIES
+        LINK_FLAGS "-Wl,--whole-archive"
+    )
+endif ()
+
+
+SET (SQLITE-EXTENSIONS minifi-sqlite-extensions PARENT_SCOPE)
+
+register_extension(minifi-sqlite-extensions)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/extensions/sqlite/PutSQL.cpp
----------------------------------------------------------------------
diff --git a/extensions/sqlite/PutSQL.cpp b/extensions/sqlite/PutSQL.cpp
new file mode 100644
index 0000000..5c5b79d
--- /dev/null
+++ b/extensions/sqlite/PutSQL.cpp
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSQL.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property PutSQL::ConnectionURL(  // NOLINT
+    "Connection URL",
+    "The database URL to connect to",
+    "");
+core::Property PutSQL::SQLStatement(  // NOLINT
+    "SQL Statement",
+    "The SQL statement to execute",
+    "");
+core::Property PutSQL::BatchSize(  // NOLINT
+    "Batch Size",
+    "The maximum number of flow files to process in one batch",
+    "1");
+
+core::Relationship PutSQL::Success(  // NOLINT
+    "success",
+    "After a successful put SQL operation, FlowFiles are sent here");
+core::Relationship PutSQL::Retry(  // NOLINT
+    "retry",
+    "Failures which might work if retried");
+core::Relationship PutSQL::Failure(  // NOLINT
+    "failure",
+    "Failures which will not work if retried");
+
+void PutSQL::initialize() {
+  std::set<core::Property> properties;
+  properties.insert(ConnectionURL);
+  properties.insert(BatchSize);
+  properties.insert(SQLStatement);
+  setSupportedProperties(std::move(properties));
+
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Retry);
+  relationships.insert(Failure);
+  setSupportedRelationships(std::move(relationships));
+}
+
+void PutSQL::onSchedule(core::ProcessContext *context,
+                        core::ProcessSessionFactory *sessionFactory) {
+  context->getProperty(ConnectionURL.getName(), db_url_);
+
+  if (db_url_.empty()) {
+    logger_->log_error("Invalid Connection URL");
+  }
+
+  std::string batch_size;
+  context->getProperty(BatchSize.getName(), batch_size);
+
+  if (batch_size.empty()) {
+    batch_size_ = 100;
+  } else {
+    batch_size_ = std::stoull(batch_size);
+  }
+
+  context->getProperty(SQLStatement.getName(), sql_);
+}
+
+void PutSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                       const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<FlowFileRecord> flow_file = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+
+  if (!flow_file) {
+    return;
+  }
+
+  uint64_t batch_processed = 1;
+
+  try {
+    // Use an existing context, if one is available
+    std::shared_ptr<minifi::sqlite::SQLiteConnection> db;
+
+    if (conn_q_.try_dequeue(db)) {
+      logger_->log_debug("Using available SQLite connection");
+    }
+
+    if (!db) {
+      logger_->log_info("Creating new SQLite connection");
+      if (db_url_.substr(0, 9) == "sqlite://") {
+        db = 
std::make_shared<minifi::sqlite::SQLiteConnection>(db_url_.substr(9));
+      } else {
+        std::stringstream err_msg;
+        err_msg << "Connection URL '" << db_url_ << "' is unsupported";
+        logger_->log_error(err_msg.str().c_str());
+        throw std::runtime_error("Connection Error");
+      }
+    }
+
+    do {
+      auto sql = std::make_shared<std::string>();
+
+      if (sql_.empty()) {
+        // SQL is not defined as a property, so get SQL from the file content
+        SQLReadCallback cb(sql);
+        session->read(flow_file, &cb);
+      } else {
+        // SQL is defined as a property, so get the property dynamically w/ EL 
support
+        context->getProperty(SQLStatement.getName(), *sql, flow_file);
+      }
+
+      auto stmt = db->prepare(*sql);
+      for (uint64_t i = 1; i < UINT64_MAX; i++) {
+        std::string val;
+        std::stringstream val_key;
+        val_key << "sql.args." << i << ".value";
+
+        if (!flow_file->getAttribute(val_key.str(), val)) {
+          break;
+        }
+
+        stmt.bind_text(i, val);
+      }
+
+      stmt.step();
+
+      if (!stmt.is_ok()) {
+        logger_->log_error("SQL statement execution failed: %s", 
db->errormsg());
+        session->transfer(flow_file, Failure);
+      }
+
+      session->transfer(flow_file, Success);
+
+      flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
+
+      if (!flow_file) {
+        logger_->log_info("Processed %d in batch", batch_processed);
+        break;
+      }
+
+      batch_processed++;
+    } while (batch_processed < batch_size_);
+
+    // Make connection available for use again
+    if (conn_q_.size_approx() < getMaxConcurrentTasks()) {
+      logger_->log_debug("Releasing SQLite connection");
+      conn_q_.enqueue(db);
+    } else {
+      logger_->log_info("Destroying SQLite connection because it is no longer 
needed");
+    }
+  } catch (std::exception &exception) {
+    logger_->log_error("Caught Exception %s", exception.what());
+    session->transfer(flow_file, Failure);
+    this->yield();
+  } catch (...) {
+    logger_->log_error("Caught Exception");
+    session->transfer(flow_file, Failure);
+    this->yield();
+  }
+}
+
+int64_t PutSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
+  sql_->resize(stream->getSize());
+  auto num_read = static_cast<uint64_t 
>(stream->readData(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
+                                                          
static_cast<int>(stream->getSize())));
+
+  if (num_read != stream->getSize()) {
+    throw std::runtime_error("SQLReadCallback failed to fully read flow file 
input stream");
+  }
+
+  return num_read;
+}
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/extensions/sqlite/PutSQL.h
----------------------------------------------------------------------
diff --git a/extensions/sqlite/PutSQL.h b/extensions/sqlite/PutSQL.h
new file mode 100644
index 0000000..2bac22d
--- /dev/null
+++ b/extensions/sqlite/PutSQL.h
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_PUTSQL_H
+#define NIFI_MINIFI_CPP_PUTSQL_H
+
+#include <core/Resource.h>
+#include <core/Processor.h>
+
+#include <concurrentqueue.h>
+
+#include "SQLiteConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PutSQL : public core::Processor {
+ public:
+  explicit PutSQL(const std::string &name, uuid_t uuid = nullptr)
+      : Processor(name, uuid),
+        logger_(logging::LoggerFactory<PutSQL>::getLogger()) {
+  }
+
+  static core::Property ConnectionURL;
+  static core::Property SQLStatement;
+  static core::Property BatchSize;
+
+  static core::Relationship Success;
+  static core::Relationship Retry;
+  static core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override {
+    logger_->log_error("onTrigger invocation with raw pointers is not 
implemented");
+  }
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
+                 const std::shared_ptr<core::ProcessSession> &session) 
override;
+
+  class SQLReadCallback : public InputStreamCallback {
+   public:
+    explicit SQLReadCallback(std::shared_ptr<std::string> sql)
+        : sql_(std::move(sql)) {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    std::shared_ptr<std::string> sql_;
+  };
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  
moodycamel::ConcurrentQueue<std::shared_ptr<minifi::sqlite::SQLiteConnection>> 
conn_q_;
+
+  uint64_t batch_size_;
+  std::string db_url_;
+  std::string sql_;
+};
+
+REGISTER_RESOURCE(PutSQL); // NOLINT
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_PUTSQL_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/extensions/sqlite/SQLiteConnection.h
----------------------------------------------------------------------
diff --git a/extensions/sqlite/SQLiteConnection.h 
b/extensions/sqlite/SQLiteConnection.h
new file mode 100644
index 0000000..ed1f10c
--- /dev/null
+++ b/extensions/sqlite/SQLiteConnection.h
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_SQLITECONNECTION_H
+#define NIFI_MINIFI_CPP_SQLITECONNECTION_H
+
+#include <sqlite3.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sqlite {
+
+class SQLiteConnection;
+
+/**
+ * RAII wrapper for a sqlite3 prepared statement
+ */
+class SQLiteStatement {
+ public:
+  SQLiteStatement(sqlite3 *db, const std::string &sql)
+      : logger_(logging::LoggerFactory<SQLiteConnection>::getLogger()) {
+    if (sqlite3_prepare_v3(db, sql.c_str(), sql.size(), 0, &stmt_, nullptr)) {
+      std::stringstream err_msg;
+      err_msg << "Failed to create prepared statement: ";
+      err_msg << sql;
+      err_msg << " because ";
+      err_msg << sqlite3_errmsg(db);
+      throw std::runtime_error(err_msg.str());
+    }
+
+    if (!stmt_) {
+      std::stringstream err_msg;
+      err_msg << "Failed to create prepared statement: ";
+      err_msg << sql;
+      err_msg << " because statement was NULL";
+      throw std::runtime_error(err_msg.str());
+    }
+
+    db_ = db;
+  }
+
+  ~SQLiteStatement() {
+    sqlite3_finalize(stmt_);
+  }
+
+  void bind_text(int pos, const std::string &text) {
+    if (sqlite3_bind_text(stmt_, pos, text.c_str(), text.size(), 
SQLITE_TRANSIENT)) {
+      std::stringstream err_msg;
+      err_msg << "Failed to bind text parameter"
+              << pos
+              << ": "
+              << text
+              << " because "
+              << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  void bind_int64(int pos, uint64_t val) {
+    if (sqlite3_bind_int64(stmt_, pos, val)) {
+      std::stringstream err_msg;
+      err_msg << "Failed to bind int64 parameter"
+              << pos
+              << ": "
+              << val
+              << " because "
+              << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  void bind_double(int pos, double val) {
+    if (sqlite3_bind_double(stmt_, pos, val)) {
+      std::stringstream err_msg;
+      err_msg << "Failed to bind double parameter"
+              << pos
+              << ": "
+              << val
+              << " because "
+              << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  void bind_null(int pos) {
+    if (sqlite3_bind_null(stmt_, pos)) {
+      std::stringstream err_msg;
+      err_msg << "Failed to bind NULL parameter"
+              << pos
+              << " because "
+              << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  void step() {
+    int rc = sqlite3_step(stmt_);
+    if (rc == SQLITE_BUSY) {
+      reset_flags();
+      is_ok_ = false;
+      is_busy_ = true;
+    } else if (rc == SQLITE_DONE) {
+      reset_flags();
+      is_done_ = true;
+    } else if (rc == SQLITE_ROW) {
+      reset_flags();
+      is_row_ = true;
+    } else {
+      is_ok_ = false;
+      is_error_ = true;
+      std::stringstream err_msg;
+      err_msg << "Failed to step statement because "
+              << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  bool is_ok() {
+    return is_ok_;
+  }
+
+  bool is_done() {
+    return is_done_;
+  }
+
+  bool is_row() {
+    return is_row_;
+  }
+
+  bool is_error() {
+    return is_error_;
+  }
+
+  bool is_busy() {
+    return is_busy_;
+  }
+
+  std::string column_text(int col) {
+    return std::string(reinterpret_cast<const char 
*>(sqlite3_column_text(stmt_, col)));
+  }
+
+  uint64_t  column_int64(int col) {
+    return sqlite3_column_int64(stmt_, col);
+  }
+
+  double column_double(int col) {
+    return sqlite3_column_double(stmt_, col);
+  }
+
+  bool column_is_int(int col) {
+    return SQLITE_INTEGER == sqlite3_column_type(stmt_, col);
+  }
+
+  bool column_is_float(int col) {
+    return SQLITE_FLOAT == sqlite3_column_type(stmt_, col);
+  }
+
+  bool column_is_text(int col) {
+    return SQLITE_TEXT == sqlite3_column_type(stmt_, col);
+  }
+
+  bool column_is_blob(int col) {
+    return SQLITE_BLOB == sqlite3_column_type(stmt_, col);
+  }
+
+  bool column_is_null(int col) {
+    return SQLITE_NULL == sqlite3_column_type(stmt_, col);
+  }
+
+  void reset() {
+    sqlite3_reset(stmt_);
+  }
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+
+  sqlite3_stmt *stmt_;
+  sqlite3 *db_ = nullptr;
+  bool is_ok_ = true;
+  bool is_busy_ = false;
+  bool is_done_ = false;
+  bool is_error_ = false;
+  bool is_row_ = false;
+
+  void reset_flags() {
+    is_ok_ = true;
+    is_busy_ = false;
+    is_done_ = false;
+    is_error_ = false;
+    is_row_ = false;
+  }
+};
+
+/**
+ * RAII wrapper for a sqlite3 connection
+ */
+class SQLiteConnection {
+ public:
+  SQLiteConnection(const std::string &filename)
+      : logger_(logging::LoggerFactory<SQLiteConnection>::getLogger()),
+        filename_(filename) {
+    logger_->log_info("Opening SQLite database: %s", filename_);
+
+    if (sqlite3_open(filename_.c_str(), &db_)) {
+      std::stringstream err_msg("Failed to open database: ");
+      err_msg << filename_;
+      err_msg << " because ";
+      err_msg << sqlite3_errmsg(db_);
+      throw std::runtime_error(err_msg.str());
+    }
+  }
+
+  SQLiteConnection(SQLiteConnection &&other)
+      : logger_(std::move(other.logger_)),
+        filename_(std::move(other.filename_)),
+        db_(other.db_) {
+    other.db_ = nullptr;
+  }
+
+  ~SQLiteConnection() {
+    logger_->log_info("Closing SQLite database: %s", filename_);
+    sqlite3_close(db_);
+  }
+
+  SQLiteStatement prepare(const std::string sql) {
+    return SQLiteStatement(db_, sql);
+  }
+
+  std::string errormsg() {
+    return sqlite3_errmsg(db_);
+  }
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::string filename_;
+
+  sqlite3 *db_ = nullptr;
+};
+
+} /* namespace sqlite */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_SQLITECONNECTION_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/libminifi/test/sqlite-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/sqlite-tests/CMakeLists.txt 
b/libminifi/test/sqlite-tests/CMakeLists.txt
new file mode 100644
index 0000000..f1d9470
--- /dev/null
+++ b/libminifi/test/sqlite-tests/CMakeLists.txt
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB SQLITE_INTEGRATION_TESTS "*.cpp")
+SET(SQLITE-EXTENSIONS_TEST_COUNT 0)
+FOREACH(testfile ${SQLITE_INTEGRATION_TESTS})
+  get_filename_component(testfilename "${testfile}" NAME_WE)
+  add_executable("${testfilename}" "${testfile}")
+  target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/sqlite")
+  target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/thirdparty/sqlite")
+
+  if (APPLE)
+    target_link_libraries (${testfilename} -Wl,-all_load 
minifi-sqlite-extensions)
+  else ()
+    target_link_libraries (${testfilename} -Wl,--whole-archive 
minifi-sqlite-extensions -Wl,--no-whole-archive)
+  endif ()
+
+  createTests("${testfilename}")
+  target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+#  target_link_libraries(minifi-sqlite-extensions sqlite)
+  MATH(EXPR SQLITE-EXTENSIONS_TEST_COUNT "${SQLITE-EXTENSIONS_TEST_COUNT}+1")
+  add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY 
${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${SQLITE-EXTENSIONS_TEST_COUNT} SQLite related 
test file(s)...")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/libminifi/test/sqlite-tests/SQLiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/sqlite-tests/SQLiteTests.cpp 
b/libminifi/test/sqlite-tests/SQLiteTests.cpp
new file mode 100644
index 0000000..6d83351
--- /dev/null
+++ b/libminifi/test/sqlite-tests/SQLiteTests.cpp
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <set>
+#include <iostream>
+#include <GenerateFlowFile.h>
+#include <UpdateAttribute.h>
+#include <LogAttribute.h>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+
+#include "PutSQL.h"
+
+TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") {  // NOLINT
+  TestController testController;
+  std::shared_ptr<core::Processor>
+      processor = 
std::make_shared<org::apache::nifi::minifi::processors::PutSQL>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+}
+
+TEST_CASE("Test Put", "[PutSQLPut]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::PutSQL>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test db
+  std::string test_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_dir[0]) != nullptr);
+
+  // Define test db file
+  std::string test_db(test_dir);
+  test_db.append("/test.db");
+
+  // Create test db
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col 
TEXT);");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+  }
+
+  // Build MiNiFi processing graph
+  auto generate = plan->addProcessor(
+      "GenerateFlowFile",
+      "Generate");
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "Update",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.1.value",
+      "42",
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.2.value",
+      "asdf",
+      true);
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  auto put = plan->addProcessor(
+      "PutSQL",
+      "PutSQL",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      put,
+      "Connection URL",
+      "sqlite://" + test_db);
+  plan->setProperty(
+      put,
+      "SQL Statement",
+      "INSERT INTO test_table (int_col, text_col) VALUES (?, ?)");
+
+  plan->runNextProcessor();  // Generate
+  plan->runNextProcessor();  // Update
+  plan->runNextProcessor();  // Log
+  plan->runNextProcessor();  // PutSQL
+
+  // Verify output state
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+    REQUIRE(42 == stmt.column_int64(0));
+    REQUIRE("asdf" == stmt.column_text(1));
+  }
+}
+
+TEST_CASE("Test Put Content", "[PutSQLPutContent]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::GetFile>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::PutSQL>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test db
+  std::string test_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_dir[0]) != nullptr);
+
+  // Define test db file
+  std::string test_db(test_dir);
+  test_db.append("/test.db");
+
+  // Define directory for test input file
+  std::string test_in_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_in_dir[0]) != nullptr);
+
+  // Define test input file
+  std::string test_file(test_in_dir);
+  test_file.append("/test.in");
+
+  // Write test SQL content
+  {
+    std::ofstream os(test_file);
+    os << "INSERT INTO test_table VALUES(?, ?);";
+  }
+
+  // Create test db
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col 
TEXT);");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+  }
+
+  // Build MiNiFi processing graph
+  auto get_file = plan->addProcessor(
+      "GetFile",
+      "Get");
+  plan->setProperty(
+      get_file,
+      processors::GetFile::Directory.getName(), test_in_dir);
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "Update",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.1.value",
+      "4242",
+      true);
+  plan->setProperty(
+      update,
+      "sql.args.2.value",
+      "fdsa",
+      true);
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  auto put = plan->addProcessor(
+      "PutSQL",
+      "PutSQL",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      put,
+      "Connection URL",
+      "sqlite://" + test_db);
+
+  plan->runNextProcessor();  // Get
+  plan->runNextProcessor();  // Update
+  plan->runNextProcessor();  // Log
+  plan->runNextProcessor();  // PutSQL
+
+  // Verify output state
+  {
+    minifi::sqlite::SQLiteConnection db(test_db);
+    auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
+    stmt.step();
+    REQUIRE(stmt.is_ok());
+    REQUIRE(4242 == stmt.column_int64(0));
+    REQUIRE("fdsa" == stmt.column_text(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e1ff861a/thirdparty/sqlite/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/sqlite/CMakeLists.txt b/thirdparty/sqlite/CMakeLists.txt
new file mode 100644
index 0000000..9de5ccf
--- /dev/null
+++ b/thirdparty/sqlite/CMakeLists.txt
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+project(sqlite VERSION 3.22.0 LANGUAGES CXX)
+
+add_library(sqlite sqlite3.c)

Reply via email to