lordgamez commented on code in PR #1763: URL: https://github.com/apache/nifi-minifi-cpp/pull/1763#discussion_r1592471104
########## extensions/standard-processors/controllers/JsonRecordSetReader.h: ########## @@ -0,0 +1,58 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "Record.h" +#include "controllers/RecordSetReader.h" +#include "core/FlowFile.h" +#include "core/ProcessSession.h" + +namespace org::apache::nifi::minifi::standard { + +class JsonRecordSetReader final : public core::RecordSetReader { + public: + explicit JsonRecordSetReader(const std::string& name, const utils::Identifier& uuid = {}); + explicit JsonRecordSetReader(const std::string& name, const std::shared_ptr<Configure>& configuration); + + JsonRecordSetReader(JsonRecordSetReader&&) = delete; + JsonRecordSetReader(const JsonRecordSetReader&) = delete; + JsonRecordSetReader& operator=(JsonRecordSetReader&&) = delete; + JsonRecordSetReader& operator=(const JsonRecordSetReader&) = delete; + + ~JsonRecordSetReader() override = default; + + EXTENSIONAPI static constexpr const char* Description = "Parses JSON into individual Record objects. " + "While the reader expects each record to be well-formed JSON, the content of a FlowFile may consist of many records, " + "each as a well-formed JSON array or JSON object with optional whitespace between them, such as the common 'JSON-per-line' format. " + "If an array is encountered, each element in that array will be treated as a separate record. " + "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. " + "If the JSON contains a field that is not present in the schema, that field will be skipped."; + + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + using RecordSetReader::RecordSetReader; + + nonstd::expected<core::RecordSet, std::error_code> read(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) override; + + void yield() override {} + bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; } Review Comment: What's the reason for the multiple spaces before `return`? ########## extensions/standard-processors/controllers/JsonRecordSetWriter.h: ########## @@ -0,0 +1,93 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "PropertyDefinitionBuilder.h" +#include "Record.h" +#include "controllers/RecordSetWriter.h" +#include "core/FlowFile.h" +#include "core/ProcessSession.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::standard { + +enum class OutputGroupingType { + Array, + OneLinePerObject +}; + +class JsonRecordSetWriter final : public core::RecordSetWriter { + public: + explicit JsonRecordSetWriter(const std::string& name, const utils::Identifier& uuid = {}); + explicit JsonRecordSetWriter(const std::string& name, const std::shared_ptr<Configure>& configuration); + + JsonRecordSetWriter(JsonRecordSetWriter&&) = delete; + JsonRecordSetWriter(const JsonRecordSetWriter&) = delete; + JsonRecordSetWriter& operator=(JsonRecordSetWriter&&) = delete; + JsonRecordSetWriter& operator=(const JsonRecordSetWriter&) = delete; + + ~JsonRecordSetWriter() override = default; + + EXTENSIONAPI static constexpr const char* Description = + "Writes the results of a RecordSet as either a JSON Array or one JSON object per line. " + "If using Array output, then even if the RecordSet consists of a single row, it will be written as an array with a single element. " + "If using One Line Per Object output, the JSON objects cannot be pretty-printed."; + + EXTENSIONAPI static constexpr auto OutputGrouping = core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputGroupingType>()>::createProperty("Output Grouping") + .withDescription("Specifies how the writer should output the JSON records (as an array or one object per line, e.g.) " + "Note that if 'One Line Per Object' is selected, then Pretty Print JSON must be false.") Review Comment: I'm not sure in this case "Pretty Print JSON must be false", I think the option is rather ignored. Also I think it's the option is `OneLinePerObject` as the enum names are using the default converted values. Should we customize the value names? ########## extensions/standard-processors/controllers/JsonRecordSetWriter.h: ########## @@ -0,0 +1,93 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "PropertyDefinitionBuilder.h" +#include "Record.h" +#include "controllers/RecordSetWriter.h" +#include "core/FlowFile.h" +#include "core/ProcessSession.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::standard { + +enum class OutputGroupingType { + Array, + OneLinePerObject +}; + +class JsonRecordSetWriter final : public core::RecordSetWriter { + public: + explicit JsonRecordSetWriter(const std::string& name, const utils::Identifier& uuid = {}); + explicit JsonRecordSetWriter(const std::string& name, const std::shared_ptr<Configure>& configuration); + + JsonRecordSetWriter(JsonRecordSetWriter&&) = delete; + JsonRecordSetWriter(const JsonRecordSetWriter&) = delete; + JsonRecordSetWriter& operator=(JsonRecordSetWriter&&) = delete; + JsonRecordSetWriter& operator=(const JsonRecordSetWriter&) = delete; + + ~JsonRecordSetWriter() override = default; + + EXTENSIONAPI static constexpr const char* Description = + "Writes the results of a RecordSet as either a JSON Array or one JSON object per line. " + "If using Array output, then even if the RecordSet consists of a single row, it will be written as an array with a single element. " + "If using One Line Per Object output, the JSON objects cannot be pretty-printed."; + + EXTENSIONAPI static constexpr auto OutputGrouping = core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputGroupingType>()>::createProperty("Output Grouping") + .withDescription("Specifies how the writer should output the JSON records (as an array or one object per line, e.g.) " + "Note that if 'One Line Per Object' is selected, then Pretty Print JSON must be false.") + .withDefaultValue(magic_enum::enum_name(OutputGroupingType::Array)) + .withAllowedValues(magic_enum::enum_names<OutputGroupingType>()) + .supportsExpressionLanguage(false) + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto PrettyPrint = core::PropertyDefinitionBuilder<>::createProperty("Pretty Print JSON") + .withDescription("Specifies whether or not the JSON should be pretty printed") Review Comment: Maybe this can be appended with something like "... only used when Array output is selected" ########## libminifi/test/TestRecord.h: ########## Review Comment: These record examples are only used in the JsonRecordTests and the content of record 1 and 2 only makes sense in the context of that file, I think these functions should be moved there or make them parameterized with data, to be able to create any generic record object for other tests. ########## extensions/standard-processors/controllers/JsonRecordSetReader.h: ########## Review Comment: Both JsonRecordSetReader and JsonRecordSetWriter should be documented in CONTROLLERS.md. Maybe we could add some information on the general implementation "how to" of the recordset readers and writers. ########## extensions/standard-processors/tests/unit/JsonRecordTests.cpp: ########## @@ -0,0 +1,145 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License.c + */ + +#include <numbers> +#include <variant> + +#include "Catch.h" +#include "RecordSetTesters.h" +#include "TestBase.h" +#include "TestRecord.h" +#include "controllers/JsonRecordSetReader.h" +#include "controllers/JsonRecordSetWriter.h" +#include "core/Record.h" + +namespace org::apache::nifi::minifi::standard::test { + +constexpr std::string_view record_per_line_str = R"({"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"} +{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem ipsum dolor sit amet, consectetur adipiscing elit.","when":"2022-11-01T19:52:11Z"} +)"; +constexpr std::string_view array_compressed_str = R"([{"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"},{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem ipsum dolor sit amet, consectetur adipiscing elit.","when":"2022-11-01T19:52:11Z"}])"; +constexpr std::string_view array_pretty_str = R"([ + { + "baz": 3.14, + "qux": [ + true, + false, + true + ], + "is_test": true, + "bar": 123, + "quux": { + "Aprikose": "apricot", + "Birne": "pear", + "Apfel": "apple" + }, + "foo": "asd", + "when": "2012-07-01T09:53:00Z" + }, + { + "baz": 3.141592653589793, + "qux": [ + false, + false, + true + ], + "is_test": true, + "bar": 98402134, + "quux": { + "Aprikose": "abricot", + "Birne": "poire", + "Apfel": "pomme" + }, + "foo": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.", + "when": "2022-11-01T19:52:11Z" + } +])"; + +bool test_json_equality(const std::string_view expected_str, const std::string_view actual_str) { Review Comment: The function name should use camelCase ########## extensions/standard-processors/tests/unit/JsonRecordTests.cpp: ########## @@ -0,0 +1,145 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License.c + */ + +#include <numbers> +#include <variant> + +#include "Catch.h" +#include "RecordSetTesters.h" +#include "TestBase.h" +#include "TestRecord.h" +#include "controllers/JsonRecordSetReader.h" +#include "controllers/JsonRecordSetWriter.h" +#include "core/Record.h" + +namespace org::apache::nifi::minifi::standard::test { + +constexpr std::string_view record_per_line_str = R"({"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"} +{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem ipsum dolor sit amet, consectetur adipiscing elit.","when":"2022-11-01T19:52:11Z"} +)"; +constexpr std::string_view array_compressed_str = R"([{"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"},{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem ipsum dolor sit amet, consectetur adipiscing elit.","when":"2022-11-01T19:52:11Z"}])"; +constexpr std::string_view array_pretty_str = R"([ + { + "baz": 3.14, + "qux": [ + true, + false, + true + ], + "is_test": true, + "bar": 123, + "quux": { + "Aprikose": "apricot", + "Birne": "pear", + "Apfel": "apple" + }, + "foo": "asd", + "when": "2012-07-01T09:53:00Z" + }, + { + "baz": 3.141592653589793, + "qux": [ + false, + false, + true + ], + "is_test": true, + "bar": 98402134, + "quux": { + "Aprikose": "abricot", + "Birne": "poire", + "Apfel": "pomme" + }, + "foo": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.", + "when": "2022-11-01T19:52:11Z" + } +])"; + +bool test_json_equality(const std::string_view expected_str, const std::string_view actual_str) { + rapidjson::Document expected; + expected.Parse(expected_str.data()); + rapidjson::Document actual; + actual.Parse(actual_str.data()); + return actual == expected; +} + +TEST_CASE("JsonRecordSetWriter test Record Per Line") { + core::RecordSet record_set; + record_set.push_back(core::test::createSampleRecord()); + record_set.push_back(core::test::createSampleRecord2()); + + JsonRecordSetWriter json_record_set_writer; + json_record_set_writer.initialize(); + CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::OutputGrouping, "OneLinePerObject")); + json_record_set_writer.onEnable(); + CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, [](auto serialized_record_set) -> bool { + return test_json_equality(record_per_line_str, serialized_record_set); + })); +} + +TEST_CASE("JsonRecordSetWriter test array") { + core::RecordSet record_set; + record_set.push_back(core::test::createSampleRecord()); + record_set.push_back(core::test::createSampleRecord2()); + + JsonRecordSetWriter json_record_set_writer; + CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, [](auto serialized_record_set) -> bool { + return test_json_equality(array_compressed_str, serialized_record_set); + })); +} + +TEST_CASE("JsonRecordSetWriter test pretty array") { + core::RecordSet record_set; + record_set.push_back(core::test::createSampleRecord()); + record_set.push_back(core::test::createSampleRecord2()); + + JsonRecordSetWriter json_record_set_writer; + json_record_set_writer.initialize(); + CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::PrettyPrint, "true")); + json_record_set_writer.onEnable(); + CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, [](auto serialized_record_set) -> bool { + return test_json_equality(array_pretty_str, serialized_record_set); + })); +} + +TEST_CASE("JsonRecordSetReader per line") { Review Comment: It seems that only the input is different for all reader tests, could we merge them and use catch2 generators instead? ########## libminifi/test/RecordSetTesters.h: ########## @@ -0,0 +1,82 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "controllers/RecordSetReader.h" +#include "controllers/RecordSetWriter.h" +#include "core/Record.h" +#include "TestBase.h" + +namespace org::apache::nifi::minifi::core::test { + +class RecordSetFixture { + public: + explicit RecordSetFixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {} + + [[nodiscard]] ProcessSession& processSession() const { return *process_session_; } + + [[nodiscard]] const Relationship& getRelationship() const { return relationship_; } + private: + TestController test_controller_{}; + TestController::PlanConfig plan_config_{}; + std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(plan_config_); + std::shared_ptr<Processor> dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); + std::shared_ptr<ProcessContext> context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }(); + std::unique_ptr<ProcessSession> process_session_ = std::make_unique<ProcessSession>(context_); + + const Relationship relationship_{"success", "description"}; +}; + +bool testRecordWriter(RecordSetWriter& record_set_writer, const RecordSet& record_set, auto tester) { + const RecordSetFixture fixture; + ProcessSession& process_session = fixture.processSession(); + + const auto flow_file = process_session.create(); + + record_set_writer.write(record_set, flow_file, process_session); + process_session.transfer(flow_file, fixture.getRelationship()); + process_session.commit(); + const auto input_stream = process_session.getFlowFileContentStream(*flow_file); + std::array<std::byte, 2048> buffer{}; + const auto buffer_size = input_stream->read(buffer); + const std::string flow_file_content(reinterpret_cast<char*>(buffer.data()), buffer_size); + + return tester(flow_file_content); +} + +inline bool testRecordReader(RecordSetReader& record_set_reader, const std::string_view serialized_record_set, const RecordSet& expected_record_set) { + const RecordSetFixture fixture; + ProcessSession& process_session = fixture.processSession(); + + const auto flow_file = process_session.create(); + process_session.writeBuffer(flow_file, serialized_record_set); + process_session.transfer(flow_file, fixture.getRelationship()); + process_session.commit(); + + const auto record_set = record_set_reader.read(flow_file, process_session); + if (!record_set) + return false; + auto& record_set_0 = (*record_set)[0]; + auto& expected_set_0 = expected_record_set[0]; + CHECK(record_set_0 == expected_set_0); Review Comment: Why are we checking the first element separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
