fgerlits commented on a change in pull request #1004: URL: https://github.com/apache/nifi-minifi-cpp/pull/1004#discussion_r603102486
########## File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp ########## @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include "SQLTestController.h" +#include "processors/ExecuteSQL.h" +#include "Utils.h" +#include "FlowFileMatcher.h" + +TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC"); + + controller.insertValues({{11, "one"}, {22, "two"}}); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + std::string row_count; + flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "2"); + + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{ + "int_col": 11, + "text_col": "one" + },{ + "int_col": 22, + "text_col": "two" + }] + )"); +} + +TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}"); Review comment: I'm surprised `==` works here. Single `=` is more standard in SQL; I think the test should either have `=`, or two versions, one with `=` and one with `==`. (Also in `ExecuteSQL4`.) ########## File path: extensions/sql/processors/QueryDatabaseTable.cpp ########## @@ -75,361 +72,234 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames( "If no columns are provided, all rows from the table will be considered, which could have a performance impact. " "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. " "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', " - "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")-> - supportsExpressionLanguage(true)->build()); + "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.") + ->supportsExpressionLanguage(true)->build()); -const core::Property QueryDatabaseTable::s_whereClause( - core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription( - "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build()); +const core::Property QueryDatabaseTable::WhereClause( + core::PropertyBuilder::createProperty("Where Clause") + ->isRequired(false) + ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.") + ->supportsExpressionLanguage(true)->build()); -const core::Property QueryDatabaseTable::s_sqlQuery( - core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription( - "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. " - "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build()); +const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue."); Review comment: Some documentation about this dynamic property should be included in PROCESSORS.md. (Especially since, according to the unit test, it does not work as I first thought it does, based on its name: only rows with column value greater than the attribute value are returned, rather than greater or equal.) ########## File path: libminifi/test/sql-tests/ExecuteSQLTests.cpp ########## @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include "SQLTestController.h" +#include "processors/ExecuteSQL.h" +#include "Utils.h" +#include "FlowFileMatcher.h" + +TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC"); + + controller.insertValues({{11, "one"}, {22, "two"}}); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + std::string row_count; + flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "2"); + + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{ + "int_col": 11, + "text_col": "one" + },{ + "int_col": 22, + "text_col": "two" + }] + )"); +} + +TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col == ${int_col_value}"); + + controller.insertValues({{11, "one"}, {22, "two"}}); + + auto input_file = plan->addInput({{"int_col_value", "11"}}); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + std::string row_count; + flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "1"); + + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{ + "int_col": 11, + "text_col": "one" + }] + )"); +} + +TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + + controller.insertValues({{11, "one"}, {22, "two"}}); + + auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC"); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + std::string row_count; + flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "2"); + + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{ + "int_col": 11, + "text_col": "one" + },{ + "int_col": 22, + "text_col": "two" + }] + )"); +} + +TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + + controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}}); + + auto input_file = plan->addInput({ + {"sql.args.1.value", "11"}, + {"sql.args.2.value", "banana"} + }, "SELECT * FROM test_table WHERE int_col == ? AND text_col == ?"); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + std::string row_count; + flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "1"); + + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{ + "int_col": 11, + "text_col": "banana" + }] + )"); +} + +TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2"); + sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC"); + + controller.insertValues({ + {101, "apple"}, + {102, "banana"}, + {103, "pear"}, + {104, "strawberry"}, + {105, "pineapple"} + }); + + auto input_file = plan->addInput(); + + plan->run(); + + auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) { + verifyJSON(plan->getContent(actual), expected); + }; + + FlowFileMatcher matcher{content_verifier, { + processors::ExecuteSQL::RESULT_ROW_COUNT, + processors::ExecuteSQL::FRAGMENT_COUNT, + processors::ExecuteSQL::FRAGMENT_INDEX, + processors::ExecuteSQL::FRAGMENT_IDENTIFIER + }}; + + utils::optional<std::string> fragment_id; + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 3); + matcher.verify(flow_files[0], + {"2", "3", "0", var("frag_id")}, + R"([{"text_col": "apple"}, {"text_col": "banana"}])"); + matcher.verify(flow_files[1], + {"2", "3", "1", var("frag_id")}, + R"([{"text_col": "pear"}, {"text_col": "strawberry"}])"); + matcher.verify(flow_files[2], + {"1", "3", "2", var("frag_id")}, Review comment: After staring at it for a while, I _think_ this checks that `FRAGMENT_IDENTIFIER` is the same in all three flow files, but this code is not easy to read. I think something like this would be clearer: ```c++ utils::optional<std::string> fragment_id; matcher.verify(..., capture(fragment_id), ...); REQUIRE(fragment_id); matcher.verify(..., *fragment_id, ...); matcher.verify(..., *fragment_id, ...); ``` i.e. `AttributeValue` would have a "capture" mode instead of an "is_variable" mode. ########## File path: libminifi/test/sql-tests/SQLTestPlan.h ########## @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../TestBase.h" + +class SQLTestPlan { + public: + SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) { + plan_ = controller.createPlan(); + processor_ = plan_->addProcessor(sql_processor, sql_processor); + plan_->setProperty(processor_, "DB Controller Service", "ODBCService"); + input_ = plan_->addConnection({}, {"success", "d"}, processor_); + for (const auto& output_rel : output_rels) { + outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {}); + } + + // initialize database service + auto service = plan_->addController("ODBCService", "ODBCService"); + plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str); + } + + std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) { + return plan_->getContent(flow_file); + } + + std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) { + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + for (const auto& attr : attributes) { + flow_file->setAttribute(attr.first, attr.second); + } + if (content) { + auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo()); + auto content_stream = plan_->getContentRepo()->write(*claim); + int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length()); + REQUIRE(ret == content->length()); + flow_file->setOffset(0); + flow_file->setSize(content->length()); + flow_file->setResourceClaim(claim); + } + input_->put(flow_file); + return flow_file; + } + + std::shared_ptr<core::Processor> getSQLProcessor() { + return processor_; + } + + void run(bool reschedule = false) { + if (reschedule) { + plan_->reset(reschedule); + } + plan_->runProcessor(0); // run the one and only sql processor + } + + std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() { Review comment: I don't mind keeping this if you think it will be useful in the future, but `getAllOutputs()` is not used anywhere. ########## File path: libminifi/test/sql-tests/SQLTestPlan.h ########## @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../TestBase.h" + +class SQLTestPlan { + public: + SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) { + plan_ = controller.createPlan(); + processor_ = plan_->addProcessor(sql_processor, sql_processor); + plan_->setProperty(processor_, "DB Controller Service", "ODBCService"); + input_ = plan_->addConnection({}, {"success", "d"}, processor_); + for (const auto& output_rel : output_rels) { + outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {}); + } + + // initialize database service + auto service = plan_->addController("ODBCService", "ODBCService"); + plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str); + } + + std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) { + return plan_->getContent(flow_file); + } + + std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) { + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + for (const auto& attr : attributes) { + flow_file->setAttribute(attr.first, attr.second); + } + if (content) { + auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo()); + auto content_stream = plan_->getContentRepo()->write(*claim); + int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length()); + REQUIRE(ret == content->length()); + flow_file->setOffset(0); + flow_file->setSize(content->length()); + flow_file->setResourceClaim(claim); + } + input_->put(flow_file); + return flow_file; + } + + std::shared_ptr<core::Processor> getSQLProcessor() { + return processor_; + } + + void run(bool reschedule = false) { + if (reschedule) { + plan_->reset(reschedule); + } + plan_->runProcessor(0); // run the one and only sql processor + } + + std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> getAllOutputs() { + std::map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> flow_file_map; + for (const auto& output : outputs_) { + flow_file_map[output.first] = getOutputs(output.first); + } + return flow_file_map; + } + + std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) { + auto conn = outputs_[relationship]; + REQUIRE(conn); + std::vector<std::shared_ptr<core::FlowFile>> flow_files; + std::set<std::shared_ptr<core::FlowFile>> expired; + while (auto flow_file = conn->poll(expired)) { + REQUIRE(expired.empty()); + flow_files.push_back(std::move(flow_file)); + } + REQUIRE(expired.empty()); Review comment: we have already checked this in line 86, so this line can be removed ########## File path: .github/workflows/ci.yml ########## @@ -132,12 +142,13 @@ jobs: run: | sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test" sudo apt update - sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev + sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc + sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so Review comment: should we do this symlinking in (the OS-specific script called from) bootstrap.sh? ########## File path: libminifi/test/sql-tests/QueryDatabaseTableTests.cpp ########## @@ -0,0 +1,248 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include "../TestBase.h" +#include "SQLTestController.h" +#include "Utils.h" +#include "FlowFileMatcher.h" + +TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table"); + sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col"); + sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col"); + + controller.insertValues({ + {101, "one"}, + {102, "two"}, + {103, "three"} + }); + + plan->run(); + + auto flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(flow_files.size() == 1); + + std::string row_count; + flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count); + REQUIRE(row_count == "3"); + auto content = plan->getContent(flow_files[0]); + verifyJSON(content, R"( + [{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}] + )", true); +} + +TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[QueryDatabaseTable2]") { + SQLTestController controller; + + auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}}); + auto sql_proc = plan->getSQLProcessor(); + sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table"); + sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col"); + sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col"); + + controller.insertValues({ + {101, "one"}, + {102, "two"}, + {103, "three"} + }); + + plan->run(); + + auto first_flow_files = plan->getOutputs({"success", "d"}); + REQUIRE(first_flow_files.size() == 1); + + controller.insertValues({ + {104, "four"}, + {105, "five"} + }); + + SECTION("Without schedule") {plan->run();} + SECTION("With schedule") {plan->run(true);} Review comment: These section descriptions are not clear; I would change them to something like "Run onTrigger only" and "Run both onSchedule and onTrigger". ########## File path: libminifi/test/sql-tests/PutSQLTests.cpp ########## @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include "../TestBase.h" +#include "SQLTestController.h" + +#include "processors/PutSQL.h" +#include "processors/GenerateFlowFile.h" +#include "processors/UpdateAttribute.h" +#include "processors/LogAttribute.h" +#include "processors/GetFile.h" + +TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") { // NOLINT Review comment: are the NOLINT annotations on the TEST_CASEs in this file necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
