I pushed the current work to the core repo under streampipes-wrapper-python [1]. Additionally, I created an Issue to track the tasks for adding the wrapper in Jira [2].
As said, this still heavily relies on the corresponding counterpart in Java where the processor is described, registered etc. And it currently only works with Kafka as the go-to transport protocol. The main concern I have right now is how to really integrate it as the goal should not be to reimplement everything in Python. Does anybody have a „smart“ idea how we could tackle this problem? Patrick [1] https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python> [2] https://issues.apache.org/jira/browse/STREAMPIPES-174 <https://issues.apache.org/jira/browse/STREAMPIPES-174> > Am 18.07.2020 um 15:41 schrieb Patrick Wiener <[email protected]>: > > To give you a sneak peak - it currently looks like this on the python side. > However, note that the main magic (registration, model declaration etc) > still happens on the java side. > > def main(): > processors = { > 'org.streampipes.pe.processors.python.simple': SimpleProcessor, > 'org.streampipes.pe.processors.python.filter': ThresholdFilter, > } > > Declarer.add(processors=processors) > StandaloneSubmitter.init() > > > if __name__ == '__main__': > main() > > An example Threshold filter: > > class ThresholdFilter(EventProcessor): > > threshold = None > filter_property = None > operator = None > > def on_invocation(self): > self.threshold = self.static_properties.get('threshold') > self.filter_property = self.static_properties.get('filter_property') > self.operator = self.static_properties.get('operation') > > def on_event(self, event): > if self.eval_operator(op=self.operator, > value=event[self.filter_property], > threshold=self.threshold): > return event > > def on_detach(self): > pass > > @staticmethod > def eval_operator(op=None, value=None, threshold=None): > switcher = { > 'LE': operator.le(value, threshold), > 'LT': operator.lt(value, threshold), > 'EQ': operator.eq(value, threshold), > 'GT': operator.gt(value, threshold), > 'GE': operator.ge(value, threshold), > 'IE': operator.ne(value, threshold) > } > return switcher.get(op, "Invalid operator“) > > > > Patrick > > >> Am 16.07.2020 um 23:29 schrieb Dominik Riemer <[email protected] >> <mailto:[email protected]>>: >> >> Hi, >> >> I'm fully +1 for a complete, plain python wrapper that integrates both >> runtime and controller interfaces! Also, given our microservice architecture >> with standalone pipeline elements that communicate over JSON/JSON-LD I don't >> think we need any code-level integration between Python and Java. >> >> Concerning the code structure, I'd suggest to create a >> streampipes-wrapper-python module in the core project, add the Python code >> there and to create an example using the current Java ExternalEventProcessor >> into the streampipes-examples project that explains how to use the Python >> wrapper. By adding the Python code to the core project, all wrappers would >> be located in the same repository, while the extensions project solely >> provides specific pipeline elements and adapters. >> In the meantime, we could add the missing features to the Python wrapper. I >> agree that it is some work, but it should mainly consist of parsing the >> graphs (we could use JSON instead of JSON-LD here to simplify parsing), >> extracting parameters and adding some Flask endpoints. >> >> As I'm not that familiar with Python, are there any Python experts on the >> list who want to help building the wrapper? I'd expect that finishing the >> wrapper could probably be done within a few days if there is a Python expert >> and someone who is familiar with the StreamPipes model - I'd be happy to >> support the model side 😉 >> >> Dominik >> >> >> -----Original Message----- >> From: Philipp Zehnder <[email protected] <mailto:[email protected]>> >> Sent: Thursday, July 16, 2020 11:09 PM >> To: [email protected] <mailto:[email protected]> >> Subject: Re: Adding StreamPipes Python wrapper >> >> Hi guys, >> >> I am also in favor of integrating the current prototype of the python >> wrapper for further development. >> I would also like to discuss how the proper integration might look like. >> The cleanest way would indeed be to implement all the StreamPipes interfaces >> and models in python, but I fear this is a lot of work and will take quite >> some time. >> Is there a better way, or does anyone have experience integrating Python >> code into Java? >> >> As for the first integration I would suggest to create a module in the >> extensions project and put all the code there. >> We currently use the interfaces of the Java wrapper, right? So we do not >> have any python specific endpoints. >> I think this would ease the usage for people in the community and already >> try an early version of the wrapper. >> Alternatively, we can put it into the core in streampipes-wrapper-python as >> you suggested, but then a user has to checkout the backend and the >> extensions project to develop a new processor. >> Whats your opinion on that? >> >> Philipp >> >> >> >>> On 16. Jul 2020, at 20:41, Patrick Wiener <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi Grainier, >>> >>> Definitely, it should make it super simple to integrate various well >>> known Python libs. The only real limitation is that they’ll also have >>> to work in an event-driven fashion. >>> >>> I guess the most clean way would be to port the Java wrapper to Python >>> to finally have something such "pip install streampipes-python“. Right >>> now in the prototype we have a special ExternalEventProcessor [1] that >>> only calls in the >>> onInvocation() and onDetach() and forwards the request to a Flask >>> endpoint in Python. >>> >>> Do you have experience with running Python + Java projects „together“? >>> I saw Flink is using py4j [2]. >>> >>> What do you think about porting it all to Python? >>> >>> Patrick >>> >>> [1] >>> https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w >>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w> >>> rapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEv >>> entProcessor.java >>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes- >>> wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalE >>> ventProcessor.java> [2] https://www.py4j.org/ <https://www.py4j.org/> >>> >>> >>>> Am 16.07.2020 um 14:42 schrieb Grainier Perera <[email protected]>: >>>> >>>> Hi Patrick, >>>> >>>> This will be very useful. We can use this to expose the capabilities >>>> of popular libraries such as scikit-learn, SciPy, etc... By the way, >>>> How this works? Will it use java bridge, Jython or something similar? >>>> >>>> Grainier Perera. >>>> >>>> >>>> On Thu, 16 Jul 2020 at 13:42, Patrick Wiener <[email protected]> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> this mail is to inform you and discuss the addition of a new wrapper >>>>> for >>>>> StreamPipes: StreamPipes Python Wrapper >>>>> >>>>> Current wrappers such as standalone (JVM) or distributed (Flink) >>>>> already allow us to develop new processors in the given runtime >>>>> environment. I suppose to add the Python wrapper to this family. >>>>> >>>>> Why Python wrapper? >>>>> >>>>> * Python is a widely used language especially in the domain of data >>>>> science >>>>> * Python is more concise and thus better to read >>>>> * We provide more options for standalone algorithms: It allows >>>>> newcomers unfamiliar with Java to faster implement their algorithmns >>>>> >>>>> Current implementation: >>>>> >>>>> Currently it only works when implementing the declareModel() as part >>>>> of the controller in Java and sending the invocation request to >>>>> Python on the receiver side. Thus, it is necessary to run both Java >>>>> + Python in one container . While it works, this should of course >>>>> not be the standard way to do it. >>>>> >>>>> As said, I already started a very very basic implementation of it >>>>> that I would add it to the core project under >>>>> streampipes-wrapper-python or do you have any other thoughts? >>>>> >>>>> I am happy to discuss this topic with you and hope that some of you >>>>> are eager to help working on the Python wrapper. >>>>> >>>>> What are your thoughts? >>>>> >>>>> Patrick >>>>> >>>>> >>>>> >>>>> >>> >> >> >
