fgerlits commented on code in PR #1863:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1863#discussion_r1840339153


##########
extensions/python/pythonprocessors/nifiapi/recordtransform.py:
##########
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import json
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession, Processor
+from .processorbase import ProcessorBase
+from .properties import FlowFile as FlowFileProxy
+from .properties import ProcessContext as ProcessContextProxy
+from .properties import PropertyDescriptor
+
+
+class __RecordTransformResult__:
+    def __init__(self, processor_result, recordJson):
+        self.processor_result = processor_result
+        self.recordJson = recordJson
+
+    def getRecordJson(self):
+        return self.recordJson
+
+    def getSchema(self):
+        return self.processor_result.schema
+
+    def getRelationship(self):
+        return self.processor_result.relationship
+
+    def getPartition(self):
+        return self.processor_result.partition
+
+
+class RecordTransformResult:
+    def __init__(self, record=None, schema=None, relationship="success", 
partition=None):
+        self.record = record
+        self.schema = schema
+        self.relationship = relationship
+        self.partition = partition
+
+    def getRecord(self):
+        return self.record
+
+    def getSchema(self):
+        return self.schema
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getPartition(self):
+        return self.partition
+
+
+class RecordTransform(ProcessorBase):
+    RECORD_READER = PropertyDescriptor(
+        name='Record Reader',
+        display_name='Record Reader',
+        description='''Specifies the Controller Service to use for reading 
incoming data''',
+        required=True,
+        controller_service_definition='RecordSetReader'
+    )
+    RECORD_WRITER = PropertyDescriptor(
+        name='Record Writer',
+        display_name='Record Writer',
+        description='''Specifies the Controller Service to use for writing out 
the records''',
+        required=True,
+        controller_service_definition='RecordSetWriter',
+    )
+
+    def onInitialize(self, processor: Processor):
+        super(RecordTransform, self).onInitialize(processor)
+        processor.addProperty(self.RECORD_READER.name, 
self.RECORD_READER.description, None, self.RECORD_READER.required, False, 
False, None, None, self.RECORD_READER.controllerServiceDefinition)
+        processor.addProperty(self.RECORD_WRITER.name, 
self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, 
False, None, None, self.RECORD_WRITER.controllerServiceDefinition)
+
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
+        flow_file = session.get()
+        if not flow_file:
+            return
+
+        context_proxy = ProcessContextProxy(context, self)
+        record_reader = 
context_proxy.getProperty(self.RECORD_READER).asControllerService()
+        if not record_reader:
+            self.logger.error("Record Reader property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+        record_writer = 
context_proxy.getProperty(self.RECORD_WRITER).asControllerService()
+        if not record_writer:
+            self.logger.error("Record Writer property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        try:
+            record_list = record_reader.read(flow_file, session)
+            if record_list is None:
+                self.logger.error("Reading flow file records returned None")
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+        except Exception:
+            self.logger.error("Failed to read flow file records due to the 
following error:\n{}".format(traceback.format_exc()))
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        flow_file_proxy = FlowFileProxy(session, flow_file)
+        results = []
+        for record in record_list:
+            record_json = json.loads(record)
+            try:
+                result = self.transform(context_proxy, record_json, None, 
flow_file_proxy)
+                result_record = result.getRecord()
+                resultjson = None if result_record is None else 
json.dumps(result_record)
+                results.append(__RecordTransformResult__(result, resultjson))
+            except Exception:
+                self.logger.error("Failed to transform record due to the 
following error:\n{}".format(traceback.format_exc()))
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+
+        partitions = []
+        partitioned_results_list = []
+        for result in results:
+            if result.getRecordJson() is None:
+                continue
+            record_partition = result.getPartition()
+            try:
+                partition_index = partitions.index(record_partition)
+                partitioned_results_list[partition_index].append(result)
+            except ValueError:
+                partitions.append(record_partition)
+                partitioned_results_list.append([result])

Review Comment:
   do we need exceptions here?
   ```suggestion
               if record_partition in partitions:
                   partition_index = partitions.index(record_partition)
                   partitioned_results_list[partition_index].append(result)
               else:
                   partitions.append(record_partition)
                   partitioned_results_list.append([result])
   ```
   would look better to me



##########
extensions/python/types/PyRecordSetReader.cpp:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+a * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PyRecordSetReader.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stream.h"
+
+#include "PyProcessSession.h"
+#include "PyScriptFlowFile.h"
+
+extern "C" {
+namespace org::apache::nifi::minifi::extensions::python {
+
+static PyMethodDef PyRecordSetReader_methods[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
+    {"read", (PyCFunction) PyRecordSetReader::read, METH_VARARGS, nullptr},
+    {}  /* Sentinel */
+};
+
+static PyType_Slot PyRecordSetReaderTypeSpecSlots[] = {  // 
NOLINT(cppcoreguidelines-avoid-c-arrays)
+    {Py_tp_dealloc, 
reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyRecordSetReader>)},
+    {Py_tp_init, reinterpret_cast<void*>(PyRecordSetReader::init)},
+    {Py_tp_methods, reinterpret_cast<void*>(PyRecordSetReader_methods)},
+    {Py_tp_new, 
reinterpret_cast<void*>(newPythonAllocatedInstance<PyRecordSetReader>)},
+    {}  /* Sentinel */
+};
+
+static PyType_Spec PyRecordSetReaderTypeSpec{
+    .name = "minifi_native.RecordSetReader",
+    .basicsize = sizeof(PyRecordSetReader),
+    .itemsize = 0,
+    .flags = Py_TPFLAGS_DEFAULT,
+    .slots = PyRecordSetReaderTypeSpecSlots
+};
+
+int PyRecordSetReader::init(PyRecordSetReader* self, PyObject* args, 
PyObject*) {
+  gsl_Expects(self && args);
+  PyObject* weak_ptr_capsule = nullptr;
+  if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) {
+    return -1;
+  }
+
+  auto record_set_reader = PyCapsule_GetPointer(weak_ptr_capsule, 
HeldTypeName);
+  if (!record_set_reader)
+    return -1;
+  self->record_set_reader_ = *static_cast<HeldType*>(record_set_reader);
+  return 0;
+}
+
+PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) {
+  gsl_Expects(self && args);
+  auto record_set_reader = self->record_set_reader_.lock();
+  if (!record_set_reader) {
+    PyErr_SetString(PyExc_AttributeError, "tried reading ssl context service 
outside 'on_trigger'");

Review Comment:
   ```suggestion
       PyErr_SetString(PyExc_AttributeError, "tried reading record set reader 
outside 'on_trigger'");
   ```



##########
libminifi/src/core/RecordField.cpp:
##########
@@ -0,0 +1,99 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   very minor, but a space is missing from the start of this line



##########
extensions/python/pythonprocessors/nifiapi/recordtransform.py:
##########
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import json
+from abc import abstractmethod
+from minifi_native import ProcessContext, ProcessSession, Processor
+from .processorbase import ProcessorBase
+from .properties import FlowFile as FlowFileProxy
+from .properties import ProcessContext as ProcessContextProxy
+from .properties import PropertyDescriptor
+
+
+class __RecordTransformResult__:
+    def __init__(self, processor_result, recordJson):
+        self.processor_result = processor_result
+        self.recordJson = recordJson
+
+    def getRecordJson(self):
+        return self.recordJson
+
+    def getSchema(self):
+        return self.processor_result.schema
+
+    def getRelationship(self):
+        return self.processor_result.relationship
+
+    def getPartition(self):
+        return self.processor_result.partition
+
+
+class RecordTransformResult:
+    def __init__(self, record=None, schema=None, relationship="success", 
partition=None):
+        self.record = record
+        self.schema = schema
+        self.relationship = relationship
+        self.partition = partition
+
+    def getRecord(self):
+        return self.record
+
+    def getSchema(self):
+        return self.schema
+
+    def getRelationship(self):
+        return self.relationship
+
+    def getPartition(self):
+        return self.partition
+
+
+class RecordTransform(ProcessorBase):
+    RECORD_READER = PropertyDescriptor(
+        name='Record Reader',
+        display_name='Record Reader',
+        description='''Specifies the Controller Service to use for reading 
incoming data''',
+        required=True,
+        controller_service_definition='RecordSetReader'
+    )
+    RECORD_WRITER = PropertyDescriptor(
+        name='Record Writer',
+        display_name='Record Writer',
+        description='''Specifies the Controller Service to use for writing out 
the records''',
+        required=True,
+        controller_service_definition='RecordSetWriter',
+    )
+
+    def onInitialize(self, processor: Processor):
+        super(RecordTransform, self).onInitialize(processor)
+        processor.addProperty(self.RECORD_READER.name, 
self.RECORD_READER.description, None, self.RECORD_READER.required, False, 
False, None, None, self.RECORD_READER.controllerServiceDefinition)
+        processor.addProperty(self.RECORD_WRITER.name, 
self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, 
False, None, None, self.RECORD_WRITER.controllerServiceDefinition)
+
+    def onTrigger(self, context: ProcessContext, session: ProcessSession):
+        flow_file = session.get()
+        if not flow_file:
+            return
+
+        context_proxy = ProcessContextProxy(context, self)
+        record_reader = 
context_proxy.getProperty(self.RECORD_READER).asControllerService()
+        if not record_reader:
+            self.logger.error("Record Reader property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+        record_writer = 
context_proxy.getProperty(self.RECORD_WRITER).asControllerService()
+        if not record_writer:
+            self.logger.error("Record Writer property is invalid")
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        try:
+            record_list = record_reader.read(flow_file, session)
+            if record_list is None:
+                self.logger.error("Reading flow file records returned None")
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+        except Exception:
+            self.logger.error("Failed to read flow file records due to the 
following error:\n{}".format(traceback.format_exc()))
+            session.transfer(flow_file, self.REL_FAILURE)
+            return
+
+        flow_file_proxy = FlowFileProxy(session, flow_file)
+        results = []
+        for record in record_list:
+            record_json = json.loads(record)
+            try:
+                result = self.transform(context_proxy, record_json, None, 
flow_file_proxy)
+                result_record = result.getRecord()
+                resultjson = None if result_record is None else 
json.dumps(result_record)
+                results.append(__RecordTransformResult__(result, resultjson))
+            except Exception:
+                self.logger.error("Failed to transform record due to the 
following error:\n{}".format(traceback.format_exc()))
+                session.transfer(flow_file, self.REL_FAILURE)
+                return
+
+        partitions = []
+        partitioned_results_list = []
+        for result in results:
+            if result.getRecordJson() is None:
+                continue
+            record_partition = result.getPartition()
+            try:
+                partition_index = partitions.index(record_partition)
+                partitioned_results_list[partition_index].append(result)
+            except ValueError:
+                partitions.append(record_partition)
+                partitioned_results_list.append([result])
+
+        for single_partition_results in partitioned_results_list:
+            partitioned_flow_file = session.create(flow_file)
+            record_writer.write([result.getRecordJson() for result in 
single_partition_results], partitioned_flow_file, session)
+            if result.getRelationship() == "success":
+                session.transfer(partitioned_flow_file, self.REL_SUCCESS)
+            else:
+                session.transferToCustomRelationship(partitioned_flow_file, 
result.getRelationship())
+
+        session.transfer(flow_file, self.REL_ORIGINAL)
+
+    @abstractmethod
+    def transform(self, context: ProcessContextProxy, flowFile: FlowFileProxy) 
-> RecordTransformResult:
+        pass

Review Comment:
   Why does this take 2 parameters only? I would expect it to take 4 parameters.



##########
docker/test/integration/features/steps/steps.py:
##########
@@ -401,14 +403,29 @@ def step_impl(context, processor_name):
     processor.set_property('SSL Context Service', ssl_context_service.name)
 
 
-# RecordSetWriters
+# Record set reader and writer
 @given("a JsonRecordSetWriter controller service is set up for 
{processor_name}")
 def step_impl(context, processor_name):
     json_record_set_writer = JsonRecordSetWriter()
 
     processor = context.test.get_node_by_name(processor_name)
     processor.controller_services.append(json_record_set_writer)
     processor.set_property('Record Set Writer', json_record_set_writer.name)
+    processor.set_property('Record Writer', json_record_set_writer.name)
+
+
+@given("a JsonRecordSetWriter controller service is set up")
+def step_impl(context):
+    json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", 
output_grouping="Array")
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.add_controller(json_record_set_writer)

Review Comment:
   I like the idea of decoupling the setup of the controller service from 
setting the property. Can we remove the old "a JsonRecordSetWriter controller 
service is set up for {processor_name}" step (I think it's only used in one 
place), and replace it with this new step?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to