This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 24b433ce7166cd66070668cb6aabd482f1643b84 Author: Adam Debreceni <[email protected]> AuthorDate: Tue Jan 11 10:52:34 2022 +0100 MINIFICPP-1718 - Handle case sensitive column names Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1238 --- docker/test/integration/features/sql.feature | 34 ++++++++++-- docker/test/integration/minifi/core/ImageStore.py | 3 ++ docker/test/integration/steps/steps.py | 6 +-- extensions/sql/data/JSONSQLWriter.cpp | 1 - extensions/sql/data/MaxCollector.h | 20 ++++---- .../data/{Utils.cpp => SQLColumnIdentifier.cpp} | 40 +++++++-------- extensions/sql/data/SQLColumnIdentifier.h | 58 +++++++++++++++++++++ extensions/sql/data/SQLRowsetProcessor.cpp | 5 +- extensions/sql/data/Utils.h | 36 ------------- extensions/sql/processors/QueryDatabaseTable.cpp | 60 ++++++++++++---------- extensions/sql/processors/QueryDatabaseTable.h | 7 +-- .../test/sql-tests/SQLColumnIdentifierTests.cpp | 56 ++++++++++++++++++++ libminifi/test/sql-tests/mocks/MockConnectors.cpp | 28 ++++++---- libminifi/test/sql-tests/mocks/MockConnectors.h | 2 +- 14 files changed, 236 insertions(+), 120 deletions(-) diff --git a/docker/test/integration/features/sql.feature b/docker/test/integration/features/sql.feature index f3c5eeb..39978dc 100644 --- a/docker/test/integration/features/sql.feature +++ b/docker/test/integration/features/sql.feature @@ -12,7 +12,7 @@ Feature: Executing SQL operations from MiNiFi-C++ And a PutSQL processor with the "SQL Statement" property set to "INSERT INTO test_table (int_col, text_col) VALUES (?, ?)" And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute And the "success" relationship of the UpdateAttribute processor is connected to the PutSQL - And an ODBCService is setup up for PutSQL with the name "ODBCService" and connection string "Driver={PostgreSQL ANSI};Server=postgresql-server;Port=5432;Database=postgres;Uid=postgres;Pwd=password;" + And an ODBCService is setup up for PutSQL with the name "ODBCService" And a PostgreSQL server is set up When all instances start up Then the query "SELECT * FROM test_table WHERE int_col = 42" returns 1 rows in less than 120 seconds on the PostgreSQL server @@ -27,11 +27,27 @@ Feature: Executing SQL operations from MiNiFi-C++ And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute And the "success" relationship of the UpdateAttribute processor is connected to the ExecuteSQL And the "success" relationship of the ExecuteSQL processor is connected to the PutFile - And an ODBCService is setup up for ExecuteSQL with the name "ODBCService" and connection string "Driver={PostgreSQL ANSI};Server=postgresql-server;Port=5432;Database=postgres;Uid=postgres;Pwd=password;" + And an ODBCService is setup up for ExecuteSQL with the name "ODBCService" And a PostgreSQL server is set up When all instances start up Then at least one flowfile with the content '[{"int_col":2,"text_col":"banana"},{"int_col":1,"text_col":"apple"}]' is placed in the monitored directory in less than 120 seconds + Scenario: A MiNiFi instance can query to test table containing mixed case column names with ExecuteSQL processor + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a UpdateAttribute processor with the "sql.args.1.value" property set to "ApPlE" + And the "sql.args.2.value" property of the UpdateAttribute processor is set to "BaNaNa" + # in PostgreSQL we have to quote column names if they contain uppercase characters + And a ExecuteSQL processor with the "SQL select query" property set to "SELECT * FROM test_table2 WHERE "tExT_Col" = ? OR "tExT_Col" = ? ORDER BY int_col DESC" + And the "Output Format" property of the ExecuteSQL processor is set to "JSON" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute + And the "success" relationship of the UpdateAttribute processor is connected to the ExecuteSQL + And the "success" relationship of the ExecuteSQL processor is connected to the PutFile + And an ODBCService is setup up for ExecuteSQL with the name "ODBCService" + And a PostgreSQL server is set up + When all instances start up + Then at least one flowfile with the content '[{"int_col":6,"tExT_Col":"BaNaNa"},{"int_col":5,"tExT_Col":"ApPlE"}]' is placed in the monitored directory in less than 120 seconds + Scenario: A MiNiFi instance can query to test table with QueryDatabaseTable processor Given a QueryDatabaseTable processor with the "Table Name" property set to "test_table" And the "Columns to Return" property of the QueryDatabaseTable processor is set to "text_col" @@ -39,7 +55,19 @@ Feature: Executing SQL operations from MiNiFi-C++ And the "Output Format" property of the QueryDatabaseTable processor is set to "JSON" And a PutFile processor with the "Directory" property set to "/tmp/output" And the "success" relationship of the QueryDatabaseTable processor is connected to the PutFile - And an ODBCService is setup up for QueryDatabaseTable with the name "ODBCService" and connection string "Driver={PostgreSQL ANSI};Server=postgresql-server;Port=5432;Database=postgres;Uid=postgres;Pwd=password;" + And an ODBCService is setup up for QueryDatabaseTable with the name "ODBCService" And a PostgreSQL server is set up When all instances start up Then at least one flowfile with the content '[{"text_col":"apple"}]' is placed in the monitored directory in less than 120 seconds + + Scenario: A MiNiFi instance can query to test table containing mixed case column names with QueryDatabaseTable processor + Given a QueryDatabaseTable processor with the "Table Name" property set to "test_table2" + And the "Columns to Return" property of the QueryDatabaseTable processor is set to ""tExT_Col"" + And the "Where Clause" property of the QueryDatabaseTable processor is set to "int_col = 5" + And the "Output Format" property of the QueryDatabaseTable processor is set to "JSON" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the QueryDatabaseTable processor is connected to the PutFile + And an ODBCService is setup up for QueryDatabaseTable with the name "ODBCService" + And a PostgreSQL server is set up + When all instances start up + Then at least one flowfile with the content '[{"tExT_Col":"ApPlE"}]' is placed in the monitored directory in less than 120 seconds diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py index c1ef051..70969c8 100644 --- a/docker/test/integration/minifi/core/ImageStore.py +++ b/docker/test/integration/minifi/core/ImageStore.py @@ -117,6 +117,9 @@ class ImageStore: echo " INSERT INTO test_table (int_col, text_col) VALUES (1, 'apple');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ echo " INSERT INTO test_table (int_col, text_col) VALUES (2, 'banana');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ echo " INSERT INTO test_table (int_col, text_col) VALUES (3, 'pear');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ + echo " CREATE TABLE test_table2 (int_col INTEGER, \\"tExT_Col\\" TEXT);" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ + echo " INSERT INTO test_table2 (int_col, \\"tExT_Col\\") VALUES (5, 'ApPlE');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ + echo " INSERT INTO test_table2 (int_col, \\"tExT_Col\\") VALUES (6, 'BaNaNa');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \ echo "EOSQL" >> /docker-entrypoint-initdb.d/init-user-db.sh """.format(base_image='postgres:13.2')) return self.__build_image(dockerfile) diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py index f14dfe0..92d3ebe 100644 --- a/docker/test/integration/steps/steps.py +++ b/docker/test/integration/steps/steps.py @@ -343,9 +343,9 @@ def step_impl(context, topic_name): # SQL -@given("an ODBCService is setup up for {processor_name} with the name \"{service_name}\" and connection string \"{connection_string}\"") -def step_impl(context, processor_name, service_name, connection_string): - odbc_service = ODBCService(name=service_name, connection_string=connection_string) +@given("an ODBCService is setup up for {processor_name} with the name \"{service_name}\"") +def step_impl(context, processor_name, service_name): + odbc_service = ODBCService(name=service_name, connection_string="Driver={PostgreSQL ANSI};Server=postgresql-server;Port=5432;Database=postgres;Uid=postgres;Pwd=password;") processor = context.test.get_node_by_name(processor_name) processor.controller_services.append(odbc_service) processor.set_property("DB Controller Service", odbc_service.name) diff --git a/extensions/sql/data/JSONSQLWriter.cpp b/extensions/sql/data/JSONSQLWriter.cpp index 2d9fa2e..1c61a7b 100644 --- a/extensions/sql/data/JSONSQLWriter.cpp +++ b/extensions/sql/data/JSONSQLWriter.cpp @@ -21,7 +21,6 @@ #include "rapidjson/stringbuffer.h" #include "rapidjson/prettywriter.h" #include "Exception.h" -#include "Utils.h" namespace org { namespace apache { diff --git a/extensions/sql/data/MaxCollector.h b/extensions/sql/data/MaxCollector.h index c3c4b9e..eba61e1 100644 --- a/extensions/sql/data/MaxCollector.h +++ b/extensions/sql/data/MaxCollector.h @@ -25,6 +25,7 @@ #include <sstream> #include "SQLRowSubscriber.h" +#include "SQLColumnIdentifier.h" namespace org { namespace apache { @@ -45,7 +46,7 @@ class MaxCollector: public SQLRowSubscriber { for (const auto& expected : state_) { if (std::find(names.begin(), names.end(), expected.first) == names.end()) { throw minifi::Exception(PROCESSOR_EXCEPTION, - "Column '" + expected.first + "' is not found in the columns of '" + query_ + "' result."); + "Column '" + expected.first.str() + "' is not found in the columns of '" + query_ + "' result."); } } } @@ -75,7 +76,7 @@ class MaxCollector: public SQLRowSubscriber { template <typename T> class MaxValue { public: - void updateMaxValue(const std::string& column, const T& value) { + void updateMaxValue(const SQLColumnIdentifier& column, const T& value) { const auto it = column_maxima.find(column); if (it == column_maxima.end()) { column_maxima.emplace(column, value); @@ -87,7 +88,7 @@ class MaxCollector: public SQLRowSubscriber { } protected: - void updateStateImpl(std::unordered_map<std::string, std::string>& state) const { + void updateStateImpl(std::unordered_map<SQLColumnIdentifier, std::string>& state) const { for (auto& curr_column_max : state) { const auto it = column_maxima.find(curr_column_max.first); if (it != column_maxima.end()) { @@ -99,25 +100,26 @@ class MaxCollector: public SQLRowSubscriber { } private: - std::unordered_map<std::string, T> column_maxima; + std::unordered_map<SQLColumnIdentifier, T> column_maxima; }; template <typename ...Ts> struct MaxValues : public MaxValue<Ts>... { - void updateState(std::unordered_map<std::string, std::string>& state) const { + void updateState(std::unordered_map<SQLColumnIdentifier, std::string>& state) const { (void)(std::initializer_list<int>{(MaxValue<Ts>::updateStateImpl(state), 0)...}); } }; public: - MaxCollector(std::string query, std::unordered_map<std::string, std::string>& state) + MaxCollector(std::string query, std::unordered_map<SQLColumnIdentifier, std::string>& state) :query_(std::move(query)), state_(state) { } template <typename T> void updateMaxValue(const std::string& column_name, const T& value) { - if (state_.count(column_name)) { - max_values_.MaxValue<T>::updateMaxValue(column_name, value); + SQLColumnIdentifier column_id(column_name); + if (state_.count(column_id)) { + max_values_.MaxValue<T>::updateMaxValue(column_id, value); } } @@ -127,7 +129,7 @@ class MaxCollector: public SQLRowSubscriber { private: const std::string query_; - std::unordered_map<std::string, std::string>& state_; + std::unordered_map<SQLColumnIdentifier, std::string>& state_; MaxValues<std::string, double, int, long long, unsigned long long> max_values_; }; diff --git a/extensions/sql/data/Utils.cpp b/extensions/sql/data/SQLColumnIdentifier.cpp similarity index 57% rename from extensions/sql/data/Utils.cpp rename to extensions/sql/data/SQLColumnIdentifier.cpp index 33273e2..7a3097f 100644 --- a/extensions/sql/data/Utils.cpp +++ b/extensions/sql/data/SQLColumnIdentifier.cpp @@ -16,30 +16,26 @@ * limitations under the License. */ -#include "Utils.h" +#include "SQLColumnIdentifier.h" -#include <vector> -#include <string> +namespace org::apache::nifi::minifi::sql { -#include "utils/StringUtils.h" +SQLColumnIdentifier::SQLColumnIdentifier(std::string str) { + // foo, "foo", [foo], `foo` are identifiers in different servers + value_ = [&] () { + if (str.length() < 2) { + return str; + } + if ((str.front() == '"' && str.back() == '"') + || (str.front() == '[' && str.back() == ']') + || (str.front() == '`' && str.back() == '`')) { + return str.substr(1, str.length() - 2); + } + return str; + }(); -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -std::vector<std::string> inputStringToList(const std::string& str) { - std::vector<std::string> fragments = StringUtils::splitAndTrimRemovingEmpty(str, ","); - for (auto& item : fragments) { - item = StringUtils::toLower(item); - } - - return fragments; + original_value_ = std::move(str); } -} /* namespace utils */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace org::apache::nifi::minifi::sql + diff --git a/extensions/sql/data/SQLColumnIdentifier.h b/extensions/sql/data/SQLColumnIdentifier.h new file mode 100644 index 0000000..a22c603 --- /dev/null +++ b/extensions/sql/data/SQLColumnIdentifier.h @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <functional> + +namespace org::apache::nifi::minifi::sql { + +class SQLColumnIdentifier { + public: + explicit SQLColumnIdentifier(std::string str); + + std::string value() const { return value_; } + + std::string str() const { return original_value_; } + + bool operator==(const SQLColumnIdentifier &other) const { + return value_ == other.value_; + } + + bool operator==(const std::string& other) const { + return value_ == other; + } + + friend struct ::std::hash<SQLColumnIdentifier>; + + private: + std::string original_value_; + std::string value_; +}; + +} // namespace org::apache::nifi::minifi::sql + +namespace std { +template<> +struct hash<org::apache::nifi::minifi::sql::SQLColumnIdentifier> { + size_t operator()(const org::apache::nifi::minifi::sql::SQLColumnIdentifier &id) const { + return std::hash<std::string>{}(id.value_); + } +}; +} // namespace std diff --git a/extensions/sql/data/SQLRowsetProcessor.cpp b/extensions/sql/data/SQLRowsetProcessor.cpp index 3d3b5ad..c419484 100644 --- a/extensions/sql/data/SQLRowsetProcessor.cpp +++ b/extensions/sql/data/SQLRowsetProcessor.cpp @@ -19,7 +19,6 @@ #include "SQLRowsetProcessor.h" #include "Exception.h" -#include "Utils.h" #include "utils/StringUtils.h" namespace org { @@ -68,7 +67,7 @@ void SQLRowsetProcessor::addRow(const Row& row, size_t rowCount) { std::vector<std::string> column_names; column_names.reserve(row.size()); for (std::size_t i = 0; i != row.size(); ++i) { - column_names.push_back(utils::StringUtils::toLower(row.getColumnName(i))); + column_names.push_back(row.getColumnName(i)); } for (const auto& subscriber : row_subscribers_) { subscriber.get().processColumnNames(column_names); @@ -76,7 +75,7 @@ void SQLRowsetProcessor::addRow(const Row& row, size_t rowCount) { } for (std::size_t i = 0; i != row.size(); ++i) { - const auto& name = utils::StringUtils::toLower(row.getColumnName(i)); + const auto& name = row.getColumnName(i); if (row.isNull(i)) { processColumn(name, "NULL"); diff --git a/extensions/sql/data/Utils.h b/extensions/sql/data/Utils.h deleted file mode 100644 index d705e76..0000000 --- a/extensions/sql/data/Utils.h +++ /dev/null @@ -1,36 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <string> -#include <vector> - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -std::vector<std::string> inputStringToList(const std::string& str); - -} /* namespace utils */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp index b3749c7..cf2c8a7 100644 --- a/extensions/sql/processors/QueryDatabaseTable.cpp +++ b/extensions/sql/processors/QueryDatabaseTable.cpp @@ -35,7 +35,6 @@ #include "data/JSONSQLWriter.h" #include "data/SQLRowsetProcessor.h" #include "data/MaxCollector.h" -#include "data/Utils.h" #include "utils/StringUtils.h" namespace org { @@ -122,20 +121,26 @@ void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) { context.getProperty(TableName.getName(), table_name_); context.getProperty(WhereClause.getName(), extra_where_clause_); - max_value_columns_ = [&] { - std::string max_value_columns_str; - context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str); - return utils::inputStringToList(max_value_columns_str); - }(); - return_columns_ = [&] { - std::string return_columns_str; - context.getProperty(ColumnNames.getName(), return_columns_str); - return utils::inputStringToList(return_columns_str); - }(); - queried_columns_ = utils::StringUtils::join(", ", return_columns_); - if (!queried_columns_.empty() && !max_value_columns_.empty()) { - // columns will be explicitly enumerated, we need to add the max value columns - queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_); + + return_columns_.clear(); + queried_columns_.clear(); + for (auto&& raw_col : utils::StringUtils::splitAndTrimRemovingEmpty(context.getProperty(ColumnNames).value_or(""), ",")) { + if (!queried_columns_.empty()) { + queried_columns_ += ", "; + } + queried_columns_ += raw_col; + return_columns_.insert(sql::SQLColumnIdentifier(std::move(raw_col))); + } + + max_value_columns_.clear(); + for (auto&& raw_col : utils::StringUtils::splitAndTrimRemovingEmpty(context.getProperty(MaxValueColumnNames).value_or(""), ",")) { + sql::SQLColumnIdentifier col_id(raw_col); + if (!queried_columns_.empty() && return_columns_.count(col_id) == 0) { + // columns will be explicitly enumerated, we need to add the max value columns as it is not yet queried + queried_columns_ += ", "; + queried_columns_ += raw_col; + } + max_value_columns_.push_back(std::move(col_id)); } initializeMaxValues(context); @@ -150,11 +155,10 @@ void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, cor auto rowset = statement->execute(); - std::unordered_map<std::string, std::string> new_max_values = max_values_; + std::unordered_map<sql::SQLColumnIdentifier, std::string> new_max_values = max_values_; sql::MaxCollector maxCollector{selectQuery, new_max_values}; auto column_filter = [&] (const std::string& column_name) { - return return_columns_.empty() - || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end(); + return return_columns_.empty() || return_columns_.count(sql::SQLColumnIdentifier(column_name)) != 0; }; sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty, column_filter}; FlowFileGenerator flow_file_creator{session, json_writer}; @@ -171,7 +175,7 @@ void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, cor for (auto& new_file : flow_file_creator.getFlowFiles()) { session.transfer(new_file, Success); for (const auto& max_column : max_value_columns_) { - new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]); + new_file->addAttribute("maxvalue." + max_column.str(), new_max_values[max_column]); } } @@ -182,7 +186,7 @@ void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, cor } bool QueryDatabaseTable::loadMaxValuesFromStoredState(const std::unordered_map<std::string, std::string> &state) { - std::unordered_map<std::string, std::string> new_max_values; + std::unordered_map<sql::SQLColumnIdentifier, std::string> new_max_values; if (state.count(TABLENAME_KEY) == 0) { logger_->log_info("State does not specify the table name."); return false; @@ -193,19 +197,19 @@ bool QueryDatabaseTable::loadMaxValuesFromStoredState(const std::unordered_map<s } for (auto& elem : state) { if (utils::StringUtils::startsWith(elem.first, MAXVALUE_KEY_PREFIX)) { - std::string column_name = elem.first.substr(MAXVALUE_KEY_PREFIX.length()); + sql::SQLColumnIdentifier column_name(elem.first.substr(MAXVALUE_KEY_PREFIX.length())); // add only those columns that we care about if (std::find(max_value_columns_.begin(), max_value_columns_.end(), column_name) != max_value_columns_.end()) { new_max_values.emplace(column_name, elem.second); } else { - logger_->log_info("State contains obsolete maximum-value column \"%s\", resetting state.", column_name); + logger_->log_info("State contains obsolete maximum-value column \"%s\", resetting state.", column_name.str()); return false; } } } for (auto& column : max_value_columns_) { if (new_max_values.find(column) == new_max_values.end()) { - logger_->log_info("New maximum-value column \"%s\" specified, resetting state.", column); + logger_->log_info("New maximum-value column \"%s\" specified, resetting state.", column.str()); return false; } } @@ -240,10 +244,10 @@ void QueryDatabaseTable::loadMaxValuesFromDynamicProperties(core::ProcessContext if (!utils::StringUtils::startsWith(key, InitialMaxValueDynamicPropertyPrefix)) { throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\""); } - const auto column_name = utils::StringUtils::toLower(key.substr(InitialMaxValueDynamicPropertyPrefix.length())); + sql::SQLColumnIdentifier column_name(key.substr(InitialMaxValueDynamicPropertyPrefix.length())); auto it = max_values_.find(column_name); if (it == max_values_.end()) { - logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", column_name); + logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", column_name.str()); continue; } // do not overwrite existing max value @@ -253,7 +257,7 @@ void QueryDatabaseTable::loadMaxValuesFromDynamicProperties(core::ProcessContext std::string value; if (context.getDynamicProperty(key, value) && !value.empty()) { it->second = value; - logger_->log_info("Setting initial maximum value of %s to %s", column_name, value); + logger_->log_info("Setting initial maximum value of %s to %s", column_name.str(), value); } } } @@ -274,7 +278,7 @@ std::string QueryDatabaseTable::buildSelectQuery() { // Logic to differentiate ">" vs ">=" based on index is copied from: // https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java // (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712. - where_clauses.push_back(utils::StringUtils::join_pack(column_name, index == 0 ? " > " : " >= ", max_value)); + where_clauses.push_back(utils::StringUtils::join_pack(column_name.str(), index == 0 ? " > " : " >= ", max_value)); } if (!extra_where_clause_.empty()) { @@ -292,7 +296,7 @@ bool QueryDatabaseTable::saveState() { std::unordered_map<std::string, std::string> state_map; state_map.emplace(TABLENAME_KEY, table_name_); for (const auto& item : max_values_) { - state_map.emplace(MAXVALUE_KEY_PREFIX + item.first, item.second); + state_map.emplace(MAXVALUE_KEY_PREFIX + item.first.str(), item.second); } return state_manager_->set(state_map); } diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h index b496bb6..70a9004 100644 --- a/extensions/sql/processors/QueryDatabaseTable.h +++ b/extensions/sql/processors/QueryDatabaseTable.h @@ -28,6 +28,7 @@ #include "core/ProcessSession.h" #include "SQLProcessor.h" #include "FlowFileSource.h" +#include "data/SQLColumnIdentifier.h" namespace org { namespace apache { @@ -82,11 +83,11 @@ class QueryDatabaseTable: public SQLProcessor, public FlowFileSource { std::shared_ptr<core::CoreComponentStateManager> state_manager_; std::string table_name_; - std::vector<std::string> return_columns_; + std::unordered_set<sql::SQLColumnIdentifier> return_columns_; std::string queried_columns_; std::string extra_where_clause_; - std::vector<std::string> max_value_columns_; - std::unordered_map<std::string, std::string> max_values_; + std::vector<sql::SQLColumnIdentifier> max_value_columns_; + std::unordered_map<sql::SQLColumnIdentifier, std::string> max_values_; }; } // namespace processors diff --git a/libminifi/test/sql-tests/SQLColumnIdentifierTests.cpp b/libminifi/test/sql-tests/SQLColumnIdentifierTests.cpp new file mode 100644 index 0000000..5486204 --- /dev/null +++ b/libminifi/test/sql-tests/SQLColumnIdentifierTests.cpp @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include "../TestBase.h" +#include "data/SQLColumnIdentifier.h" + +using org::apache::nifi::minifi::sql::SQLColumnIdentifier; + +TEST_CASE("Handles escaped identifiers") { + REQUIRE(SQLColumnIdentifier("Abc").value() == "Abc"); + REQUIRE(SQLColumnIdentifier("\"Abc\"").value() == "Abc"); // standard + REQUIRE(SQLColumnIdentifier("[Abc]").value() == "Abc"); // MS SQL + REQUIRE(SQLColumnIdentifier("`Abc`").value() == "Abc"); // MySQL + REQUIRE(SQLColumnIdentifier("\"").value() == "\""); // single char is ignored +} + +TEST_CASE("Can return the original representation") { + REQUIRE(SQLColumnIdentifier("Abc").str() == "Abc"); + REQUIRE(SQLColumnIdentifier("\"Abc\"").str() == "\"Abc\""); + REQUIRE(SQLColumnIdentifier("[Abc]").str() == "[Abc]"); + REQUIRE(SQLColumnIdentifier("`Abc`").str() == "`Abc`"); +} + +TEST_CASE("Equality is escape-agnostic") { + REQUIRE(SQLColumnIdentifier("Abc") == SQLColumnIdentifier("\"Abc\"")); + REQUIRE(SQLColumnIdentifier("\"Abc\"") == SQLColumnIdentifier("[Abc]")); + REQUIRE(SQLColumnIdentifier("[Abc]") == SQLColumnIdentifier("`Abc`")); + REQUIRE(SQLColumnIdentifier("\"Abc\"") == SQLColumnIdentifier("`Abc`")); +} + +TEST_CASE("Hashing is escape-agnostic") { + std::unordered_set<SQLColumnIdentifier> ids; + ids.insert(SQLColumnIdentifier("[Abc]")); + REQUIRE(ids.count(SQLColumnIdentifier("\"Abc\"")) == 1); + REQUIRE(ids.count(SQLColumnIdentifier("[Abc]")) == 1); + REQUIRE(ids.count(SQLColumnIdentifier("`Abc`")) == 1); + REQUIRE(ids.count(SQLColumnIdentifier("Abc")) == 1); + REQUIRE(ids.count(SQLColumnIdentifier("abc")) == 0); +} diff --git a/libminifi/test/sql-tests/mocks/MockConnectors.cpp b/libminifi/test/sql-tests/mocks/MockConnectors.cpp index 246c814..9400d11 100644 --- a/libminifi/test/sql-tests/mocks/MockConnectors.cpp +++ b/libminifi/test/sql-tests/mocks/MockConnectors.cpp @@ -171,11 +171,11 @@ std::unique_ptr<MockRowset> MockRowset::select(const std::vector<std::string>& c } std::unique_ptr<Rowset> MockDB::execute(const std::string& query, const std::vector<std::string>& args) { - if (minifi::utils::StringUtils::startsWith(query, "create table")) { + if (minifi::utils::StringUtils::startsWith(query, "create table", false)) { createTable(query); - } else if (minifi::utils::StringUtils::startsWith(query, "insert into")) { + } else if (minifi::utils::StringUtils::startsWith(query, "insert into", false)) { insertInto(query, args); - } else if (minifi::utils::StringUtils::startsWith(query, "select")) { + } else if (minifi::utils::StringUtils::startsWith(query, "select", false)) { return select(query, args); } else { throw std::runtime_error("Unknown query type"); @@ -186,7 +186,7 @@ std::unique_ptr<Rowset> MockDB::execute(const std::string& query, const std::vec void MockDB::createTable(const std::string& query) { std::smatch match; - std::regex expr("create table (\\w+)\\s*\\((.*)\\);"); + std::regex expr("create table (\\w+)\\s*\\((.*)\\);", std::regex_constants::icase); std::regex_search(query, match, expr); std::string table_name = match[1]; auto columns_with_type = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(match[2], ","); @@ -208,7 +208,7 @@ void MockDB::insertInto(const std::string& query, const std::vector<std::string> } std::smatch match; - std::regex expr("insert into (\\w+)\\s*(\\((.*)\\))*\\s*values\\s*\\((.+)\\)"); + std::regex expr("insert into (\\w+)\\s*(\\((.*)\\))*\\s*values\\s*\\((.+)\\)", std::regex_constants::icase); std::regex_search(replaced_query, match, expr); std::string table_name = match[1]; std::vector<std::string> values = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(match[4], ","); @@ -242,7 +242,7 @@ std::unique_ptr<Rowset> MockDB::select(const std::string& query, const std::vect } std::smatch match; - std::regex expr("select\\s+(.+)\\s+from\\s+(\\w+)\\s*(where ((.+(?= order by))|.+$))*\\s*(order by (.+))*"); + std::regex expr("select\\s+(.+)\\s+from\\s+(\\w+)\\s*(where ((.+(?= order by))|.+$))*\\s*(order by (.+))*", std::regex_constants::icase); std::regex_search(replaced_query, match, expr); auto cols = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(match[1], ","); if (cols[0] == "*") { @@ -257,7 +257,7 @@ std::unique_ptr<Rowset> MockDB::select(const std::string& query, const std::vect if (!order.empty()) { auto order_col_and_sort = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(order, " "); order_col = order_col_and_sort[0]; - descending = order_col_and_sort[1] == "desc"; + descending = minifi::utils::StringUtils::equalsIgnoreCase(order_col_and_sort[1], "desc"); } return tables_.at(table_name).select(cols, condition, order_col, !descending); } @@ -267,7 +267,13 @@ std::function<bool(const MockRow&)> MockDB::parseWhereCondition(const std::strin return [](const MockRow&){ return true; }; } - auto condition_strings = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(full_condition_str, "and"); + std::vector<std::string> condition_strings; + // TODO(adebreceni): let StringUtils::split* functions take either multiple delimiters or specify case sensitivity + for (auto&& condition : minifi::utils::StringUtils::splitAndTrimRemovingEmpty(full_condition_str, "and")) { + for (auto&& subcondition : minifi::utils::StringUtils::splitAndTrimRemovingEmpty(condition, "AND")) { + condition_strings.push_back(std::move(subcondition)); + } + } std::vector<std::function<bool(const MockRow&)>> condition_parts; for (const auto& condition_str : condition_strings) { if (condition_str.find(">") != std::string::npos) { @@ -367,9 +373,9 @@ void MockDB::storeDb() { } DataType MockDB::stringToDataType(const std::string& type_str) { - if (type_str == "integer") return DataType::INTEGER; - if (type_str == "text") return DataType::STRING; - if (type_str == "real") return DataType::DOUBLE; + if (utils::StringUtils::equalsIgnoreCase(type_str, "integer")) return DataType::INTEGER; + if (utils::StringUtils::equalsIgnoreCase(type_str, "text")) return DataType::STRING; + if (utils::StringUtils::equalsIgnoreCase(type_str, "real")) return DataType::DOUBLE; throw std::runtime_error("Unimplemented data type"); } diff --git a/libminifi/test/sql-tests/mocks/MockConnectors.h b/libminifi/test/sql-tests/mocks/MockConnectors.h index f677f91..b0f2aa6 100644 --- a/libminifi/test/sql-tests/mocks/MockConnectors.h +++ b/libminifi/test/sql-tests/mocks/MockConnectors.h @@ -130,7 +130,7 @@ class MockDB { class MockStatement : public Statement { public: explicit MockStatement(const std::string& query, const std::string& file_path) - : Statement(minifi::utils::StringUtils::toLower(query)), file_path_(file_path) { + : Statement(query), file_path_(file_path) { } std::unique_ptr<Rowset> execute(const std::vector<std::string>& args = {}) override;
