Hi, just to give you a small update. I implemented the processor registration for configs (k,v) and service discovery in consul [1].
In addition, I started working on the API endpoints that enable to communicate with the pipeline management being part of StreamPipes backend. Therefore, I started porting the necessary endpoints [2] to Python thereby using flask and flask-classful combined with bjoern, a fast, ultra-lightweight WSGI server [3]. Feel free to check the open JIRA issues if you’re keen on contributing :) https://issues.apache.org/jira/browse/STREAMPIPES-174 <https://issues.apache.org/jira/browse/STREAMPIPES-174> Patrick [1] https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python/streampipes/utils <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python/streampipes/utils> [2] https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python/streampipes/api <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python/streampipes/api> [3] https://github.com/jonashaag/bjoern/ <https://github.com/jonashaag/bjoern/> > Am 20.07.2020 um 17:48 schrieb Patrick Wiener <wie...@apache.org>: > > I added a new SIP to confluent [1] > > Feel free to contribute to it and add your thoughts. I mainly think its good > to point out the design choices we make. > > Patrick > > [1] > https://cwiki.apache.org/confluence/display/STREAMPIPES/SIP-02+Python+wrapper > <https://cwiki.apache.org/confluence/display/STREAMPIPES/SIP-02+Python+wrapper> > > <https://cwiki.apache.org/confluence/display/STREAMPIPES/SIP-02+Python+wrapper > > <https://cwiki.apache.org/confluence/display/STREAMPIPES/SIP-02+Python+wrapper>> > >> Am 19.07.2020 um 17:50 schrieb Patrick Wiener <wie...@apache.org >> <mailto:wie...@apache.org>>: >> >> I pushed the current work to the core repo under streampipes-wrapper-python >> [1]. Additionally, I created an Issue to track the >> tasks for adding the wrapper in Jira [2]. >> >> As said, this still heavily relies on the corresponding counterpart in Java >> where the processor is described, registered etc. >> And it currently only works with Kafka as the go-to transport protocol. >> >> The main concern I have right now is how to really integrate it as the goal >> should not be to reimplement everything in Python. >> >> Does anybody have a „smart“ idea how we could tackle this problem? >> >> Patrick >> >> [1] >> https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python >> >> <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python> >> >> <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python >> >> <https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python>> >> [2] https://issues.apache.org/jira/browse/STREAMPIPES-174 >> <https://issues.apache.org/jira/browse/STREAMPIPES-174> >> <https://issues.apache.org/jira/browse/STREAMPIPES-174 >> <https://issues.apache.org/jira/browse/STREAMPIPES-174>> >> >>> Am 18.07.2020 um 15:41 schrieb Patrick Wiener <wie...@apache.org >>> <mailto:wie...@apache.org> <mailto:wie...@apache.org >>> <mailto:wie...@apache.org>>>: >>> >>> To give you a sneak peak - it currently looks like this on the python side. >>> However, note that the main magic (registration, model declaration etc) >>> still happens on the java side. >>> >>> def main(): >>> processors = { >>> 'org.streampipes.pe.processors.python.simple': SimpleProcessor, >>> 'org.streampipes.pe.processors.python.filter': ThresholdFilter, >>> } >>> >>> Declarer.add(processors=processors) >>> StandaloneSubmitter.init() >>> >>> >>> if __name__ == '__main__': >>> main() >>> >>> An example Threshold filter: >>> >>> class ThresholdFilter(EventProcessor): >>> >>> threshold = None >>> filter_property = None >>> operator = None >>> >>> def on_invocation(self): >>> self.threshold = self.static_properties.get('threshold') >>> self.filter_property = self.static_properties.get('filter_property') >>> self.operator = self.static_properties.get('operation') >>> >>> def on_event(self, event): >>> if self.eval_operator(op=self.operator, >>> value=event[self.filter_property], >>> threshold=self.threshold): >>> return event >>> >>> def on_detach(self): >>> pass >>> >>> @staticmethod >>> def eval_operator(op=None, value=None, threshold=None): >>> switcher = { >>> 'LE': operator.le(value, threshold), >>> 'LT': operator.lt(value, threshold), >>> 'EQ': operator.eq(value, threshold), >>> 'GT': operator.gt(value, threshold), >>> 'GE': operator.ge(value, threshold), >>> 'IE': operator.ne(value, threshold) >>> } >>> return switcher.get(op, "Invalid operator“) >>> >>> >>> >>> Patrick >>> >>> >>>> Am 16.07.2020 um 23:29 schrieb Dominik Riemer <rie...@apache.org >>>> <mailto:rie...@apache.org> <mailto:rie...@apache.org >>>> <mailto:rie...@apache.org>>>: >>>> >>>> Hi, >>>> >>>> I'm fully +1 for a complete, plain python wrapper that integrates both >>>> runtime and controller interfaces! Also, given our microservice >>>> architecture with standalone pipeline elements that communicate over >>>> JSON/JSON-LD I don't think we need any code-level integration between >>>> Python and Java. >>>> >>>> Concerning the code structure, I'd suggest to create a >>>> streampipes-wrapper-python module in the core project, add the Python code >>>> there and to create an example using the current Java >>>> ExternalEventProcessor into the streampipes-examples project that explains >>>> how to use the Python wrapper. By adding the Python code to the core >>>> project, all wrappers would be located in the same repository, while the >>>> extensions project solely provides specific pipeline elements and adapters. >>>> In the meantime, we could add the missing features to the Python wrapper. >>>> I agree that it is some work, but it should mainly consist of parsing the >>>> graphs (we could use JSON instead of JSON-LD here to simplify parsing), >>>> extracting parameters and adding some Flask endpoints. >>>> >>>> As I'm not that familiar with Python, are there any Python experts on the >>>> list who want to help building the wrapper? I'd expect that finishing the >>>> wrapper could probably be done within a few days if there is a Python >>>> expert and someone who is familiar with the StreamPipes model - I'd be >>>> happy to support the model side 😉 >>>> >>>> Dominik >>>> >>>> >>>> -----Original Message----- >>>> From: Philipp Zehnder <zehn...@apache.org <mailto:zehn...@apache.org> >>>> <mailto:zehn...@apache.org <mailto:zehn...@apache.org>>> >>>> Sent: Thursday, July 16, 2020 11:09 PM >>>> To: dev@streampipes.apache.org <mailto:dev@streampipes.apache.org> >>>> <mailto:dev@streampipes.apache.org <mailto:dev@streampipes.apache.org>> >>>> Subject: Re: Adding StreamPipes Python wrapper >>>> >>>> Hi guys, >>>> >>>> I am also in favor of integrating the current prototype of the python >>>> wrapper for further development. >>>> I would also like to discuss how the proper integration might look like. >>>> The cleanest way would indeed be to implement all the StreamPipes >>>> interfaces and models in python, but I fear this is a lot of work and will >>>> take quite some time. >>>> Is there a better way, or does anyone have experience integrating Python >>>> code into Java? >>>> >>>> As for the first integration I would suggest to create a module in the >>>> extensions project and put all the code there. >>>> We currently use the interfaces of the Java wrapper, right? So we do not >>>> have any python specific endpoints. >>>> I think this would ease the usage for people in the community and already >>>> try an early version of the wrapper. >>>> Alternatively, we can put it into the core in streampipes-wrapper-python >>>> as you suggested, but then a user has to checkout the backend and the >>>> extensions project to develop a new processor. >>>> Whats your opinion on that? >>>> >>>> Philipp >>>> >>>> >>>> >>>>> On 16. Jul 2020, at 20:41, Patrick Wiener <wie...@apache.org >>>>> <mailto:wie...@apache.org> <mailto:wie...@apache.org >>>>> <mailto:wie...@apache.org>>> wrote: >>>>> >>>>> Hi Grainier, >>>>> >>>>> Definitely, it should make it super simple to integrate various well >>>>> known Python libs. The only real limitation is that they’ll also have >>>>> to work in an event-driven fashion. >>>>> >>>>> I guess the most clean way would be to port the Java wrapper to Python >>>>> to finally have something such "pip install streampipes-python“. Right >>>>> now in the prototype we have a special ExternalEventProcessor [1] that >>>>> only calls in the >>>>> onInvocation() and onDetach() and forwards the request to a Flask >>>>> endpoint in Python. >>>>> >>>>> Do you have experience with running Python + Java projects „together“? >>>>> I saw Flink is using py4j [2]. >>>>> >>>>> What do you think about porting it all to Python? >>>>> >>>>> Patrick >>>>> >>>>> [1] >>>>> https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w> >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w>> >>>>> rapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEv >>>>> entProcessor.java >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes- >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-> >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes- >>>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes->> >>>>> wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalE >>>>> ventProcessor.java> [2] https://www.py4j.org/ <https://www.py4j.org/> >>>>> <https://www.py4j.org/ <https://www.py4j.org/>> <https://www.py4j.org/ >>>>> <https://www.py4j.org/> <https://www.py4j.org/ <https://www.py4j.org/>>> >>>>> >>>>> >>>>>> Am 16.07.2020 um 14:42 schrieb Grainier Perera <grain...@apache.org >>>>>> <mailto:grain...@apache.org> <mailto:grain...@apache.org >>>>>> <mailto:grain...@apache.org>>>: >>>>>> >>>>>> Hi Patrick, >>>>>> >>>>>> This will be very useful. We can use this to expose the capabilities >>>>>> of popular libraries such as scikit-learn, SciPy, etc... By the way, >>>>>> How this works? Will it use java bridge, Jython or something similar? >>>>>> >>>>>> Grainier Perera. >>>>>> >>>>>> >>>>>> On Thu, 16 Jul 2020 at 13:42, Patrick Wiener <wie...@apache.org >>>>>> <mailto:wie...@apache.org> <mailto:wie...@apache.org >>>>>> <mailto:wie...@apache.org>>> wrote: >>>>>> >>>>>>> Hi guys, >>>>>>> >>>>>>> this mail is to inform you and discuss the addition of a new wrapper >>>>>>> for >>>>>>> StreamPipes: StreamPipes Python Wrapper >>>>>>> >>>>>>> Current wrappers such as standalone (JVM) or distributed (Flink) >>>>>>> already allow us to develop new processors in the given runtime >>>>>>> environment. I suppose to add the Python wrapper to this family. >>>>>>> >>>>>>> Why Python wrapper? >>>>>>> >>>>>>> * Python is a widely used language especially in the domain of data >>>>>>> science >>>>>>> * Python is more concise and thus better to read >>>>>>> * We provide more options for standalone algorithms: It allows >>>>>>> newcomers unfamiliar with Java to faster implement their algorithmns >>>>>>> >>>>>>> Current implementation: >>>>>>> >>>>>>> Currently it only works when implementing the declareModel() as part >>>>>>> of the controller in Java and sending the invocation request to >>>>>>> Python on the receiver side. Thus, it is necessary to run both Java >>>>>>> + Python in one container . While it works, this should of course >>>>>>> not be the standard way to do it. >>>>>>> >>>>>>> As said, I already started a very very basic implementation of it >>>>>>> that I would add it to the core project under >>>>>>> streampipes-wrapper-python or do you have any other thoughts? >>>>>>> >>>>>>> I am happy to discuss this topic with you and hope that some of you >>>>>>> are eager to help working on the Python wrapper. >>>>>>> >>>>>>> What are your thoughts? >>>>>>> >>>>>>> Patrick