lordgamez commented on code in PR #1763:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1763#discussion_r1592471104


##########
extensions/standard-processors/controllers/JsonRecordSetReader.h:
##########
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "Record.h"
+#include "controllers/RecordSetReader.h"
+#include "core/FlowFile.h"
+#include "core/ProcessSession.h"
+
+namespace org::apache::nifi::minifi::standard {
+
+class JsonRecordSetReader final : public core::RecordSetReader {
+ public:
+  explicit JsonRecordSetReader(const std::string& name, const 
utils::Identifier& uuid = {});
+  explicit JsonRecordSetReader(const std::string& name, const 
std::shared_ptr<Configure>& configuration);
+
+  JsonRecordSetReader(JsonRecordSetReader&&) = delete;
+  JsonRecordSetReader(const JsonRecordSetReader&) = delete;
+  JsonRecordSetReader& operator=(JsonRecordSetReader&&) = delete;
+  JsonRecordSetReader& operator=(const JsonRecordSetReader&) = delete;
+
+  ~JsonRecordSetReader() override = default;
+
+  EXTENSIONAPI static constexpr const char* Description = "Parses JSON into 
individual Record objects. "
+    "While the reader expects each record to be well-formed JSON, the content 
of a FlowFile may consist of many records, "
+    "each as a well-formed JSON array or JSON object with optional whitespace 
between them, such as the common 'JSON-per-line' format. "
+    "If an array is encountered, each element in that array will be treated as 
a separate record. "
+    "If the schema that is configured contains a field that is not present in 
the JSON, a null value will be used. "
+    "If the JSON contains a field that is not present in the schema, that 
field will be skipped.";
+
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+  using RecordSetReader::RecordSetReader;
+
+  nonstd::expected<core::RecordSet, std::error_code> read(const 
std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) 
override;
+
+  void yield() override {}
+  bool isRunning() const override {    return getState() == 
core::controller::ControllerServiceState::ENABLED; }

Review Comment:
   What's the reason for the multiple spaces before `return`?



##########
extensions/standard-processors/controllers/JsonRecordSetWriter.h:
##########
@@ -0,0 +1,93 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "PropertyDefinitionBuilder.h"
+#include "Record.h"
+#include "controllers/RecordSetWriter.h"
+#include "core/FlowFile.h"
+#include "core/ProcessSession.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::standard {
+
+enum class OutputGroupingType {
+  Array,
+  OneLinePerObject
+};
+
+class JsonRecordSetWriter final : public core::RecordSetWriter {
+ public:
+  explicit JsonRecordSetWriter(const std::string& name, const 
utils::Identifier& uuid = {});
+  explicit JsonRecordSetWriter(const std::string& name, const 
std::shared_ptr<Configure>& configuration);
+
+  JsonRecordSetWriter(JsonRecordSetWriter&&) = delete;
+  JsonRecordSetWriter(const JsonRecordSetWriter&) = delete;
+  JsonRecordSetWriter& operator=(JsonRecordSetWriter&&) = delete;
+  JsonRecordSetWriter& operator=(const JsonRecordSetWriter&) = delete;
+
+  ~JsonRecordSetWriter() override = default;
+
+  EXTENSIONAPI static constexpr const char* Description =
+      "Writes the results of a RecordSet as either a JSON Array or one JSON 
object per line. "
+      "If using Array output, then even if the RecordSet consists of a single 
row, it will be written as an array with a single element. "
+      "If using One Line Per Object output, the JSON objects cannot be 
pretty-printed.";
+
+  EXTENSIONAPI static constexpr auto OutputGrouping = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputGroupingType>()>::createProperty("Output
 Grouping")
+    .withDescription("Specifies how the writer should output the JSON records 
(as an array or one object per line, e.g.) "
+                    "Note that if 'One Line Per Object' is selected, then 
Pretty Print JSON must be false.")

Review Comment:
   I'm not sure in this case "Pretty Print JSON must be false", I think the 
option is rather ignored. Also I think it's the option is `OneLinePerObject` as 
the enum names are using the default converted values. Should we customize the 
value names?



##########
extensions/standard-processors/controllers/JsonRecordSetWriter.h:
##########
@@ -0,0 +1,93 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "PropertyDefinitionBuilder.h"
+#include "Record.h"
+#include "controllers/RecordSetWriter.h"
+#include "core/FlowFile.h"
+#include "core/ProcessSession.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::standard {
+
+enum class OutputGroupingType {
+  Array,
+  OneLinePerObject
+};
+
+class JsonRecordSetWriter final : public core::RecordSetWriter {
+ public:
+  explicit JsonRecordSetWriter(const std::string& name, const 
utils::Identifier& uuid = {});
+  explicit JsonRecordSetWriter(const std::string& name, const 
std::shared_ptr<Configure>& configuration);
+
+  JsonRecordSetWriter(JsonRecordSetWriter&&) = delete;
+  JsonRecordSetWriter(const JsonRecordSetWriter&) = delete;
+  JsonRecordSetWriter& operator=(JsonRecordSetWriter&&) = delete;
+  JsonRecordSetWriter& operator=(const JsonRecordSetWriter&) = delete;
+
+  ~JsonRecordSetWriter() override = default;
+
+  EXTENSIONAPI static constexpr const char* Description =
+      "Writes the results of a RecordSet as either a JSON Array or one JSON 
object per line. "
+      "If using Array output, then even if the RecordSet consists of a single 
row, it will be written as an array with a single element. "
+      "If using One Line Per Object output, the JSON objects cannot be 
pretty-printed.";
+
+  EXTENSIONAPI static constexpr auto OutputGrouping = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<OutputGroupingType>()>::createProperty("Output
 Grouping")
+    .withDescription("Specifies how the writer should output the JSON records 
(as an array or one object per line, e.g.) "
+                    "Note that if 'One Line Per Object' is selected, then 
Pretty Print JSON must be false.")
+    .withDefaultValue(magic_enum::enum_name(OutputGroupingType::Array))
+    .withAllowedValues(magic_enum::enum_names<OutputGroupingType>())
+    .supportsExpressionLanguage(false)
+    .isRequired(true)
+    .build();
+
+  EXTENSIONAPI static constexpr auto PrettyPrint = 
core::PropertyDefinitionBuilder<>::createProperty("Pretty Print JSON")
+    .withDescription("Specifies whether or not the JSON should be pretty 
printed")

Review Comment:
   Maybe this can be appended with something like "... only used when Array 
output is selected"



##########
libminifi/test/TestRecord.h:
##########


Review Comment:
   These record examples are only used in the JsonRecordTests and the content 
of record 1 and 2 only makes sense in the context of that file, I think these 
functions should be moved there or make them parameterized with data, to be 
able to create any generic record object for other tests.



##########
extensions/standard-processors/controllers/JsonRecordSetReader.h:
##########


Review Comment:
   Both JsonRecordSetReader and JsonRecordSetWriter should be documented in 
CONTROLLERS.md. Maybe we could add some information on the general 
implementation "how to" of the recordset readers and writers.



##########
extensions/standard-processors/tests/unit/JsonRecordTests.cpp:
##########
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.c
+ */
+
+#include <numbers>
+#include <variant>
+
+#include "Catch.h"
+#include "RecordSetTesters.h"
+#include "TestBase.h"
+#include "TestRecord.h"
+#include "controllers/JsonRecordSetReader.h"
+#include "controllers/JsonRecordSetWriter.h"
+#include "core/Record.h"
+
+namespace org::apache::nifi::minifi::standard::test {
+
+constexpr std::string_view record_per_line_str = 
R"({"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"}
+{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.","when":"2022-11-01T19:52:11Z"}
+)";
+constexpr std::string_view array_compressed_str = 
R"([{"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"},{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.","when":"2022-11-01T19:52:11Z"}])";
+constexpr std::string_view array_pretty_str = R"([
+    {
+        "baz": 3.14,
+        "qux": [
+            true,
+            false,
+            true
+        ],
+        "is_test": true,
+        "bar": 123,
+        "quux": {
+            "Aprikose": "apricot",
+            "Birne": "pear",
+            "Apfel": "apple"
+        },
+        "foo": "asd",
+        "when": "2012-07-01T09:53:00Z"
+    },
+    {
+        "baz": 3.141592653589793,
+        "qux": [
+            false,
+            false,
+            true
+        ],
+        "is_test": true,
+        "bar": 98402134,
+        "quux": {
+            "Aprikose": "abricot",
+            "Birne": "poire",
+            "Apfel": "pomme"
+        },
+        "foo": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
+        "when": "2022-11-01T19:52:11Z"
+    }
+])";
+
+bool test_json_equality(const std::string_view expected_str, const 
std::string_view actual_str) {

Review Comment:
   The function name should use camelCase



##########
extensions/standard-processors/tests/unit/JsonRecordTests.cpp:
##########
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.c
+ */
+
+#include <numbers>
+#include <variant>
+
+#include "Catch.h"
+#include "RecordSetTesters.h"
+#include "TestBase.h"
+#include "TestRecord.h"
+#include "controllers/JsonRecordSetReader.h"
+#include "controllers/JsonRecordSetWriter.h"
+#include "core/Record.h"
+
+namespace org::apache::nifi::minifi::standard::test {
+
+constexpr std::string_view record_per_line_str = 
R"({"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"}
+{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.","when":"2022-11-01T19:52:11Z"}
+)";
+constexpr std::string_view array_compressed_str = 
R"([{"baz":3.14,"qux":[true,false,true],"is_test":true,"bar":123,"quux":{"Aprikose":"apricot","Birne":"pear","Apfel":"apple"},"foo":"asd","when":"2012-07-01T09:53:00Z"},{"baz":3.141592653589793,"qux":[false,false,true],"is_test":true,"bar":98402134,"quux":{"Aprikose":"abricot","Birne":"poire","Apfel":"pomme"},"foo":"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.","when":"2022-11-01T19:52:11Z"}])";
+constexpr std::string_view array_pretty_str = R"([
+    {
+        "baz": 3.14,
+        "qux": [
+            true,
+            false,
+            true
+        ],
+        "is_test": true,
+        "bar": 123,
+        "quux": {
+            "Aprikose": "apricot",
+            "Birne": "pear",
+            "Apfel": "apple"
+        },
+        "foo": "asd",
+        "when": "2012-07-01T09:53:00Z"
+    },
+    {
+        "baz": 3.141592653589793,
+        "qux": [
+            false,
+            false,
+            true
+        ],
+        "is_test": true,
+        "bar": 98402134,
+        "quux": {
+            "Aprikose": "abricot",
+            "Birne": "poire",
+            "Apfel": "pomme"
+        },
+        "foo": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
+        "when": "2022-11-01T19:52:11Z"
+    }
+])";
+
+bool test_json_equality(const std::string_view expected_str, const 
std::string_view actual_str) {
+  rapidjson::Document expected;
+  expected.Parse(expected_str.data());
+  rapidjson::Document actual;
+  actual.Parse(actual_str.data());
+  return actual == expected;
+}
+
+TEST_CASE("JsonRecordSetWriter test Record Per Line") {
+  core::RecordSet record_set;
+  record_set.push_back(core::test::createSampleRecord());
+  record_set.push_back(core::test::createSampleRecord2());
+
+  JsonRecordSetWriter json_record_set_writer;
+  json_record_set_writer.initialize();
+  
CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::OutputGrouping, 
"OneLinePerObject"));
+  json_record_set_writer.onEnable();
+  CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, 
[](auto serialized_record_set) -> bool {
+    return test_json_equality(record_per_line_str, serialized_record_set);
+  }));
+}
+
+TEST_CASE("JsonRecordSetWriter test array") {
+  core::RecordSet record_set;
+  record_set.push_back(core::test::createSampleRecord());
+  record_set.push_back(core::test::createSampleRecord2());
+
+  JsonRecordSetWriter json_record_set_writer;
+  CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, 
[](auto serialized_record_set) -> bool {
+    return test_json_equality(array_compressed_str, serialized_record_set);
+  }));
+}
+
+TEST_CASE("JsonRecordSetWriter test pretty array") {
+  core::RecordSet record_set;
+  record_set.push_back(core::test::createSampleRecord());
+  record_set.push_back(core::test::createSampleRecord2());
+
+  JsonRecordSetWriter json_record_set_writer;
+  json_record_set_writer.initialize();
+  CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::PrettyPrint, 
"true"));
+  json_record_set_writer.onEnable();
+  CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, 
[](auto serialized_record_set) -> bool {
+    return test_json_equality(array_pretty_str, serialized_record_set);
+  }));
+}
+
+TEST_CASE("JsonRecordSetReader per line") {

Review Comment:
   It seems that only the input is different for all reader tests, could we 
merge them and use catch2 generators instead?



##########
libminifi/test/RecordSetTesters.h:
##########
@@ -0,0 +1,82 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "controllers/RecordSetReader.h"
+#include "controllers/RecordSetWriter.h"
+#include "core/Record.h"
+#include "TestBase.h"
+
+namespace org::apache::nifi::minifi::core::test {
+
+class RecordSetFixture {
+ public:
+  explicit RecordSetFixture(TestController::PlanConfig config = {}): 
plan_config_(std::move(config)) {}
+
+  [[nodiscard]] ProcessSession& processSession() const { return 
*process_session_; }
+
+  [[nodiscard]] const Relationship& getRelationship() const { return 
relationship_; }
+ private:
+  TestController test_controller_{};
+  TestController::PlanConfig plan_config_{};
+  std::shared_ptr<TestPlan> test_plan_ = 
test_controller_.createPlan(plan_config_);
+  std::shared_ptr<Processor> dummy_processor_ = 
test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
+  std::shared_ptr<ProcessContext> context_ = [this] { 
test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
+  std::unique_ptr<ProcessSession> process_session_ = 
std::make_unique<ProcessSession>(context_);
+
+  const Relationship relationship_{"success", "description"};
+};
+
+bool testRecordWriter(RecordSetWriter& record_set_writer, const RecordSet& 
record_set, auto tester) {
+  const RecordSetFixture fixture;
+  ProcessSession& process_session = fixture.processSession();
+
+  const auto flow_file = process_session.create();
+
+  record_set_writer.write(record_set, flow_file, process_session);
+  process_session.transfer(flow_file, fixture.getRelationship());
+  process_session.commit();
+  const auto input_stream = 
process_session.getFlowFileContentStream(*flow_file);
+  std::array<std::byte, 2048> buffer{};
+  const auto buffer_size = input_stream->read(buffer);
+  const std::string flow_file_content(reinterpret_cast<char*>(buffer.data()), 
buffer_size);
+
+  return tester(flow_file_content);
+}
+
+inline bool testRecordReader(RecordSetReader& record_set_reader, const 
std::string_view serialized_record_set, const RecordSet& expected_record_set) {
+  const RecordSetFixture fixture;
+  ProcessSession& process_session = fixture.processSession();
+
+  const auto flow_file = process_session.create();
+  process_session.writeBuffer(flow_file, serialized_record_set);
+  process_session.transfer(flow_file, fixture.getRelationship());
+  process_session.commit();
+
+  const auto record_set = record_set_reader.read(flow_file, process_session);
+  if (!record_set)
+    return false;
+  auto& record_set_0 = (*record_set)[0];
+  auto& expected_set_0 = expected_record_set[0];
+  CHECK(record_set_0 == expected_set_0);

Review Comment:
   Why are we checking the first element separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to