Github user arpadboda commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/404#discussion_r222244653
--- Diff: python/minifi/__init__.py ---
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ctypes import cdll
+import ctypes
+from abc import abstractmethod
+
+
+
+class RPG_PORT(ctypes.Structure):
+ _fields_ = [('port_id', ctypes.c_char_p)]
+
+class NIFI_STRUCT(ctypes.Structure):
+ _fields_ = [('instancePtr', ctypes.c_void_p),
+ ('port', RPG_PORT)]
+
+class CFlow(ctypes.Structure):
+ _fields_ = [('plan', ctypes.c_void_p)]
+
+class CFlowFile(ctypes.Structure):
+ _fields_ = [('size', ctypes.c_int),
+ ('in', ctypes.c_void_p),
+ ('contentLocation', ctypes.c_char_p),
+ ('attributes', ctypes.c_void_p),
+ ('ffp', ctypes.c_void_p)]
+
+class CProcessor(ctypes.Structure):
+ _fields_ = [('processor_ptr', ctypes.c_void_p)]
+
+class CProcessSession(ctypes.Structure):
+ _fields_ = [('process_session', ctypes.c_void_p)]
+
+CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession))
+
+class Processor(object):
+ def __init__(self, cprocessor, minifi):
+ super(Processor, self).__init__()
+ self._proc = cprocessor
+ self._minifi = minifi
+
+ def set_property(self, name, value):
+ self._minifi.set_property( self._proc, name.encode("UTF-8"),
value.encode("UTF-8"))
+
+class PyProcessor(object):
+ def __init__(self, instance, minifi, flow):
+ super(PyProcessor, self).__init__()
+ self._instance = instance
+ self._minifi = minifi
+ self._flow = flow
+
+ def setBase(self, proc):
+ self._proc = proc
+
+ def get(self, session):
+ ff = self._minifi.get(self._instance.get_instance(),self._flow,
session)
+ return FlowFile(self._minifi, ff)
+
+ def transfer(self, session, ff, rel):
+ self._minifi.transfer(session, self._flow, rel.encode("UTF-8"))
+
+ @abstractmethod
+ def _onTriggerCallback(self):
+ pass
+
+ def getTriggerCallback(self):
+ if self._callback is None:
+ print("creating ptr")
+ self._callback = self._onTriggerCallback()
+ return self._callback
+
+ @abstractmethod
+ def onSchedule(self):
+ pass
+
+
+class RPG(object):
+ def __init__(self, nifi_struct):
+ super(RPG, self).__init__()
+ self._nifi = nifi_struct
+
+ def get_instance(self):
+ return self._nifi
+
+class FlowFile(object):
+ def __init__(self, minifi, ff):
+ super(FlowFile, self).__init__()
+ self._minifi = minifi
+ self._ff = ff
+
+ def add_attribute(self, name, value):
+ vallen = len(value)
+ self._minifi.add_attribute(self._ff, name.encode("UTF-8"),
value.encode("UTF-8"), vallen)
+
+ def get_instance(self):
+ return self._ff
+
+
+
+class MiNiFi(object):
+ """ Proxy Connector """
+ def __init__(self, dll_file, url, port):
+ super(MiNiFi, self).__init__()
+ self._minifi= cdll.LoadLibrary(dll_file)
+ """ create instance """
+ self._minifi.create_instance.argtypes = [ctypes.c_char_p ,
ctypes.POINTER(RPG_PORT)]
+ self._minifi.create_instance.restype = ctypes.POINTER(NIFI_STRUCT)
+ """ create port """
+ self._minifi.create_port.argtype = ctypes.c_char_p
+ self._minifi.create_port.restype = ctypes.POINTER(RPG_PORT)
+ """ free port """
+ self._minifi.free_port.argtype = ctypes.POINTER(RPG_PORT)
+ self._minifi.free_port.restype = ctypes.c_int
+ """ create new flow """
+ self._minifi.create_new_flow.argtype = ctypes.POINTER(NIFI_STRUCT)
+ self._minifi.create_new_flow.restype = ctypes.POINTER(CFlow)
+ """ add processor """
+ self._minifi.add_processor.argtypes = [ctypes.POINTER(CFlow) ,
ctypes.c_char_p ]
+ self._minifi.add_processor.restype = ctypes.POINTER(CProcessor)
+ """ set processor property"""
+ self._minifi.set_property.argtypes = [ctypes.POINTER(CProcessor) ,
ctypes.c_char_p , ctypes.c_char_p ]
+ self._minifi.set_property.restype = ctypes.c_int
+ """ set instance property"""
+ self._minifi.set_instance_property.argtypes =
[ctypes.POINTER(NIFI_STRUCT) , ctypes.c_char_p , ctypes.c_char_p ]
+ self._minifi.set_instance_property.restype = ctypes.c_int
+ """ get next flow file """
+ self._minifi.get_next_flow_file.argtypes =
[ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow) ]
+ self._minifi.get_next_flow_file.restype = ctypes.POINTER(CFlowFile)
+ """ transmit flow file """
+ self._minifi.transmit_flowfile.argtypes =
[ctypes.POINTER(CFlowFile) , ctypes.POINTER(NIFI_STRUCT) ]
+ self._minifi.transmit_flowfile.restype = ctypes.c_int
+ """ get ff """
+ self._minifi.get.argtypes = [ctypes.POINTER(NIFI_STRUCT) ,
ctypes.POINTER(CFlow), ctypes.POINTER(CProcessSession) ]
+ self._minifi.get.restype = ctypes.POINTER(CFlowFile)
+ """ add python processor """
+ self._minifi.add_python_processor.argtypes =
[ctypes.POINTER(CFlow) , ctypes.c_void_p ]
+ self._minifi.add_python_processor.restype =
ctypes.POINTER(CProcessor)
+ """ transfer ff """
+ self._minifi.transfer.argtypes = [ctypes.POINTER(CProcessSession),
ctypes.POINTER(CFlow) , ctypes.c_char_p ]
+ self._minifi.transfer.restype = ctypes.c_int
+ """ add attribute to ff """
+ self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile),
ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
+ self._minifi.add_attribute.restype = ctypes.c_int
+
+ self._minifi.init_api.argtype = ctypes.c_char_p
+ self._minifi.init_api.restype = ctypes.c_int
+ self._minifi.init_api(dll_file.encode("UTF-8"))
+
+ self._instance = self.__open_rpg(url,port)
+ self._flow = self._minifi.create_new_flow(
self._instance.get_instance() )
+ self._minifi.enable_logging()
+
+
+
+ def __open_rpg(self, url, port):
+ rpgPort = self._minifi.create_port(port)
+ rpg = self._minifi.create_instance(url, rpgPort)
+ ret = RPG(rpg)
+ return ret
+
+ def get_c_lib(self):
+ return self._minifi
+
+ def set_property(self, name, value):
+
self._minifi.set_instance_property(self._instance.get_instance(),
name.encode("UTF-8"), value.encode("UTF-8"))
+
+
+ def add_processor(self, processor):
+ proc = self._minifi.add_processor(self._flow,
processor.get_name().encode("UTF-8"))
+ return Processor(proc,self._minifi)
+
+ def create_python_processor(self, module, processor):
+ m =
getattr(module,processor)(self._instance,self._minifi,self._flow)
+ proc = self._minifi.add_python_processor(self._flow,
m.getTriggerCallback())
+ m.setBase(proc)
+ return m
+
+ def get_next_flowfile(self):
+ ff =
self._minifi.get_next_flow_file(self._instance.get_instance(), self._flow)
+ return FlowFile(self._minifi, ff)
+
+ def transmit_flowfile(self, ff):
+ if ff.get_instance():
+
self._minifi.transmit_flowfile(ff.get_instance(),self._instance.get_instance())
+
+class GetFile(object):
+ def __init__(self):
+ super(GetFile, self).__init__()
+
+ def get_name(self):
+ return "GetFile"
--- End diff --
Please unify indentation in this file (either 2 or 4 spaces)
---