lordgamez commented on code in PR #1863:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1863#discussion_r1842114295


##########
docker/test/integration/features/steps/steps.py:
##########
@@ -401,14 +403,29 @@ def step_impl(context, processor_name):
     processor.set_property('SSL Context Service', ssl_context_service.name)
 
 
-# RecordSetWriters
+# Record set reader and writer
 @given("a JsonRecordSetWriter controller service is set up for 
{processor_name}")
 def step_impl(context, processor_name):
     json_record_set_writer = JsonRecordSetWriter()
 
     processor = context.test.get_node_by_name(processor_name)
     processor.controller_services.append(json_record_set_writer)
     processor.set_property('Record Set Writer', json_record_set_writer.name)
+    processor.set_property('Record Writer', json_record_set_writer.name)
+
+
+@given("a JsonRecordSetWriter controller service is set up")
+def step_impl(context):
+    json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", 
output_grouping="Array")
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.add_controller(json_record_set_writer)

Review Comment:
   Good idea, updated in ddaff1ec9ab40fb6070d0274bba7bb4b9e226e13



##########
extensions/python/pythonprocessors/nifiapi/recordtransform.py:
##########
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import json
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession, Processor
+from .processorbase import ProcessorBase
+from .properties import FlowFile as FlowFileProxy
+from .properties import ProcessContext as ProcessContextProxy
+from .properties import PropertyDescriptor
+
+
+class __RecordTransformResult__:
+    def __init__(self, processor_result, recordJson):
+        self.processor_result = processor_result
+        self.recordJson = recordJson
+
+    def getRecordJson(self):
+        return self.recordJson
+
+    def getSchema(self):
+        return self.processor_result.schema
+
+    def getRelationship(self):
+        return self.processor_result.relationship
+
+    def getPartition(self):
+        return self.processor_result.partition
+
+
+class RecordTransformResult:
+    def __init__(self, record=None, schema=None, relationship="success", 
partition=None):
+        self.record = record
+        self.schema = schema
+        self.relationship = relationship
+        self.partition = partition
+
+    def getRecord(self):
+        return self.record
+
+    def getSchema(self):
+        return self.schema
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getPartition(self):
+        return self.partition
+
+
+class RecordTransform(ProcessorBase):
+    RECORD_READER = PropertyDescriptor(
+        name='Record Reader',
+        display_name='Record Reader',
+        description='''Specifies the Controller Service to use for reading 
incoming data''',
+        required=True,
+        controller_service_definition='RecordSetReader'
+    )
+    RECORD_WRITER = PropertyDescriptor(
+        name='Record Writer',
+        display_name='Record Writer',
+        description='''Specifies the Controller Service to use for writing out 
the records''',
+        required=True,
+        controller_service_definition='RecordSetWriter',
+    )
+
+    def onInitialize(self, processor: Processor):
+        super(RecordTransform, self).onInitialize(processor)
+        processor.addProperty(self.RECORD_READER.name, 
self.RECORD_READER.description, None, self.RECORD_READER.required, False, 
False, None, None, self.RECORD_READER.controllerServiceDefinition)
+        processor.addProperty(self.RECORD_WRITER.name, 
self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, 
False, None, None, self.RECORD_WRITER.controllerServiceDefinition)
+
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
+        flow_file = session.get()
+        if not flow_file:
+            return
+
+        context_proxy = ProcessContextProxy(context, self)
+        record_reader = 
context_proxy.getProperty(self.RECORD_READER).asControllerService()
+        if not record_reader:
+            self.logger.error("Record Reader property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+        record_writer = 
context_proxy.getProperty(self.RECORD_WRITER).asControllerService()
+        if not record_writer:
+            self.logger.error("Record Writer property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        try:
+            record_list = record_reader.read(flow_file, session)
+            if record_list is None:
+                self.logger.error("Reading flow file records returned None")
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+        except Exception:
+            self.logger.error("Failed to read flow file records due to the 
following error:\n{}".format(traceback.format_exc()))
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        flow_file_proxy = FlowFileProxy(session, flow_file)
+        results = []
+        for record in record_list:
+            record_json = json.loads(record)
+            try:
+                result = self.transform(context_proxy, record_json, None, 
flow_file_proxy)
+                result_record = result.getRecord()
+                resultjson = None if result_record is None else 
json.dumps(result_record)
+                results.append(__RecordTransformResult__(result, resultjson))
+            except Exception:
+                self.logger.error("Failed to transform record due to the 
following error:\n{}".format(traceback.format_exc()))
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+
+        partitions = []
+        partitioned_results_list = []
+        for result in results:
+            if result.getRecordJson() is None:
+                continue
+            record_partition = result.getPartition()
+            try:
+                partition_index = partitions.index(record_partition)
+                partitioned_results_list[partition_index].append(result)
+            except ValueError:
+                partitions.append(record_partition)
+                partitioned_results_list.append([result])

Review Comment:
   Makes sense, updated in ddaff1ec9ab40fb6070d0274bba7bb4b9e226e13



##########
extensions/python/pythonprocessors/nifiapi/recordtransform.py:
##########
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import json
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession, Processor
+from .processorbase import ProcessorBase
+from .properties import FlowFile as FlowFileProxy
+from .properties import ProcessContext as ProcessContextProxy
+from .properties import PropertyDescriptor
+
+
+class __RecordTransformResult__:
+    def __init__(self, processor_result, recordJson):
+        self.processor_result = processor_result
+        self.recordJson = recordJson
+
+    def getRecordJson(self):
+        return self.recordJson
+
+    def getSchema(self):
+        return self.processor_result.schema
+
+    def getRelationship(self):
+        return self.processor_result.relationship
+
+    def getPartition(self):
+        return self.processor_result.partition
+
+
+class RecordTransformResult:
+    def __init__(self, record=None, schema=None, relationship="success", 
partition=None):
+        self.record = record
+        self.schema = schema
+        self.relationship = relationship
+        self.partition = partition
+
+    def getRecord(self):
+        return self.record
+
+    def getSchema(self):
+        return self.schema
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getPartition(self):
+        return self.partition
+
+
+class RecordTransform(ProcessorBase):
+    RECORD_READER = PropertyDescriptor(
+        name='Record Reader',
+        display_name='Record Reader',
+        description='''Specifies the Controller Service to use for reading 
incoming data''',
+        required=True,
+        controller_service_definition='RecordSetReader'
+    )
+    RECORD_WRITER = PropertyDescriptor(
+        name='Record Writer',
+        display_name='Record Writer',
+        description='''Specifies the Controller Service to use for writing out 
the records''',
+        required=True,
+        controller_service_definition='RecordSetWriter',
+    )
+
+    def onInitialize(self, processor: Processor):
+        super(RecordTransform, self).onInitialize(processor)
+        processor.addProperty(self.RECORD_READER.name, 
self.RECORD_READER.description, None, self.RECORD_READER.required, False, 
False, None, None, self.RECORD_READER.controllerServiceDefinition)
+        processor.addProperty(self.RECORD_WRITER.name, 
self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, 
False, None, None, self.RECORD_WRITER.controllerServiceDefinition)
+
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
+        flow_file = session.get()
+        if not flow_file:
+            return
+
+        context_proxy = ProcessContextProxy(context, self)
+        record_reader = 
context_proxy.getProperty(self.RECORD_READER).asControllerService()
+        if not record_reader:
+            self.logger.error("Record Reader property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+        record_writer = 
context_proxy.getProperty(self.RECORD_WRITER).asControllerService()
+        if not record_writer:
+            self.logger.error("Record Writer property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        try:
+            record_list = record_reader.read(flow_file, session)
+            if record_list is None:
+                self.logger.error("Reading flow file records returned None")
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+        except Exception:
+            self.logger.error("Failed to read flow file records due to the 
following error:\n{}".format(traceback.format_exc()))
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        flow_file_proxy = FlowFileProxy(session, flow_file)
+        results = []
+        for record in record_list:
+            record_json = json.loads(record)
+            try:
+                result = self.transform(context_proxy, record_json, None, 
flow_file_proxy)
+                result_record = result.getRecord()
+                resultjson = None if result_record is None else 
json.dumps(result_record)
+                results.append(__RecordTransformResult__(result, resultjson))
+            except Exception:
+                self.logger.error("Failed to transform record due to the 
following error:\n{}".format(traceback.format_exc()))
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+
+        partitions = []
+        partitioned_results_list = []
+        for result in results:
+            if result.getRecordJson() is None:
+                continue
+            record_partition = result.getPartition()
+            try:
+                partition_index = partitions.index(record_partition)
+                partitioned_results_list[partition_index].append(result)
+            except ValueError:
+                partitions.append(record_partition)
+                partitioned_results_list.append([result])
+
+        for single_partition_results in partitioned_results_list:
+            partitioned_flow_file = session.create(flow_file)
+            record_writer.write([result.getRecordJson() for result in 
single_partition_results], partitioned_flow_file, session)
+            if result.getRelationship() == "success":
+                session.transfer(partitioned_flow_file, self.REL_SUCCESS)
+            else:
+                session.transferToCustomRelationship(partitioned_flow_file, 
result.getRelationship())
+
+        session.transfer(flow_file, self.REL_ORIGINAL)
+
+    @abstractmethod
+    def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) 
-> RecordTransformResult:
+        pass

Review Comment:
   Good catch, fixed in ddaff1ec9ab40fb6070d0274bba7bb4b9e226e13



##########
extensions/python/types/PyRecordSetReader.cpp:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PyRecordSetReader.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stream.h"
+
+#include "PyProcessSession.h"
+#include "PyScriptFlowFile.h"
+
+extern "C" {
+namespace org::apache::nifi::minifi::extensions::python {
+
+static PyMethodDef PyRecordSetReader_methods[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
+    {"read", (PyCFunction) PyRecordSetReader::read, METH_VARARGS, nullptr},
+    {}  /* Sentinel */
+};
+
+static PyType_Slot PyRecordSetReaderTypeSpecSlots[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
+    {Py_tp_dealloc, 
reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyRecordSetReader>)},
+    {Py_tp_init, reinterpret_cast<void*>(PyRecordSetReader::init)},
+    {Py_tp_methods, reinterpret_cast<void*>(PyRecordSetReader_methods)},
+    {Py_tp_new, 
reinterpret_cast<void*>(newPythonAllocatedInstance<PyRecordSetReader>)},
+    {}  /* Sentinel */
+};
+
+static PyType_Spec PyRecordSetReaderTypeSpec{
+    .name = "minifi_native.RecordSetReader",
+    .basicsize = sizeof(PyRecordSetReader),
+    .itemsize = 0,
+    .flags = Py_TPFLAGS_DEFAULT,
+    .slots = PyRecordSetReaderTypeSpecSlots
+};
+
+int PyRecordSetReader::init(PyRecordSetReader* self, PyObject* args, 
PyObject*) {
+  gsl_Expects(self && args);
+  PyObject* weak_ptr_capsule = nullptr;
+  if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) {
+    return -1;
+  }
+
+  auto record_set_reader = PyCapsule_GetPointer(weak_ptr_capsule, 
HeldTypeName);
+  if (!record_set_reader)
+    return -1;
+  self->record_set_reader_ = *static_cast<HeldType*>(record_set_reader);
+  return 0;
+}
+
+PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) {
+  gsl_Expects(self && args);
+  auto record_set_reader = self->record_set_reader_.lock();
+  if (!record_set_reader) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading ssl context service 
outside 'on_trigger'");

Review Comment:
   Updated in ddaff1ec9ab40fb6070d0274bba7bb4b9e226e13



##########
libminifi/src/core/RecordField.cpp:
##########
@@ -0,0 +1,99 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   Updated in ddaff1ec9ab40fb6070d0274bba7bb4b9e226e13



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to