I pushed the current work to the core repo under streampipes-wrapper-python 
[1]. Additionally, I created an Issue to track the
tasks for adding the wrapper in Jira [2].

As said, this still heavily relies on the corresponding counterpart in Java 
where the processor is described, registered etc.
And it currently only works with Kafka as the go-to transport protocol.

The main concern I have right now is how to really integrate it as the goal 
should not be to reimplement everything in Python.

Does anybody have a „smart“ idea how we could tackle this problem? 

Patrick 

[1] 
https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python
 
<https://github.com/apache/incubator-streampipes/tree/dev/streampipes-wrapper-python>
[2] https://issues.apache.org/jira/browse/STREAMPIPES-174 
<https://issues.apache.org/jira/browse/STREAMPIPES-174>

> Am 18.07.2020 um 15:41 schrieb Patrick Wiener <[email protected]>:
> 
> To give you a sneak peak - it currently looks like this on the python side. 
> However, note that the main magic (registration, model declaration etc) 
> still happens on the java side.
> 
> def main():
>     processors = {
>         'org.streampipes.pe.processors.python.simple': SimpleProcessor,
>         'org.streampipes.pe.processors.python.filter': ThresholdFilter,
>     }
> 
>     Declarer.add(processors=processors)
>     StandaloneSubmitter.init()
> 
> 
> if __name__ == '__main__':
>     main()
> 
> An example Threshold filter:
> 
> class ThresholdFilter(EventProcessor):
> 
>     threshold = None
>     filter_property = None
>     operator = None
> 
>     def on_invocation(self):
>         self.threshold = self.static_properties.get('threshold')
>         self.filter_property = self.static_properties.get('filter_property')
>         self.operator = self.static_properties.get('operation')
> 
>     def on_event(self, event):
>         if self.eval_operator(op=self.operator,
>                               value=event[self.filter_property],
>                               threshold=self.threshold):
>             return event
> 
>     def on_detach(self):
>         pass
> 
>     @staticmethod
>     def eval_operator(op=None, value=None, threshold=None):
>         switcher = {
>             'LE': operator.le(value, threshold),
>             'LT': operator.lt(value, threshold),
>             'EQ': operator.eq(value, threshold),
>             'GT': operator.gt(value, threshold),
>             'GE': operator.ge(value, threshold),
>             'IE': operator.ne(value, threshold)
>         }
>         return switcher.get(op, "Invalid operator“)
> 
> 
> 
> Patrick
> 
> 
>> Am 16.07.2020 um 23:29 schrieb Dominik Riemer <[email protected] 
>> <mailto:[email protected]>>:
>> 
>> Hi,
>> 
>> I'm fully +1 for a complete, plain python wrapper that integrates both 
>> runtime and controller interfaces! Also, given our microservice architecture 
>> with standalone pipeline elements that communicate over JSON/JSON-LD I don't 
>> think we need any code-level integration between Python and Java. 
>> 
>> Concerning the code structure, I'd suggest to create a 
>> streampipes-wrapper-python module in the core project, add the Python code 
>> there and to create an example using the current Java ExternalEventProcessor 
>> into the streampipes-examples project that explains how to use the Python 
>> wrapper. By adding the Python code to the core project, all wrappers would 
>> be located in the same repository, while the extensions project solely 
>> provides specific pipeline elements and adapters.
>> In the meantime, we could add the missing features to the Python wrapper. I 
>> agree that it is some work, but it should mainly consist of parsing the 
>> graphs (we could use JSON instead of JSON-LD here to simplify parsing), 
>> extracting parameters and adding some Flask endpoints.
>> 
>> As I'm not that familiar with Python, are there any Python experts on the 
>> list who want to help building the wrapper? I'd expect that finishing the 
>> wrapper could probably be done within a few days if there is a Python expert 
>> and someone who is familiar with the StreamPipes model - I'd be happy to 
>> support the model side 😉
>> 
>> Dominik 
>> 
>> 
>> -----Original Message-----
>> From: Philipp Zehnder <[email protected] <mailto:[email protected]>> 
>> Sent: Thursday, July 16, 2020 11:09 PM
>> To: [email protected] <mailto:[email protected]>
>> Subject: Re: Adding StreamPipes Python wrapper
>> 
>> Hi guys,
>> 
>> I am also in favor of integrating the current prototype of the python 
>> wrapper for further development.
>> I would also like to discuss how the proper integration might look like. 
>> The cleanest way would indeed be to implement all the StreamPipes interfaces 
>> and models in python, but I fear this is a lot of work and will take quite 
>> some time.
>> Is there a better way, or does anyone have experience integrating Python 
>> code into Java?
>> 
>> As for the first integration I would suggest to create a module in the 
>> extensions project and put all the code there. 
>> We currently use the interfaces of the Java wrapper, right? So we do not 
>> have any python specific endpoints.
>> I think this would ease the usage for people in the community and already 
>> try an early version of the wrapper.
>> Alternatively, we can put it into the core in streampipes-wrapper-python as 
>> you suggested, but then a user has to checkout the backend and the 
>> extensions project to develop a new processor.
>> Whats your opinion on that?
>> 
>> Philipp
>> 
>> 
>> 
>>> On 16. Jul 2020, at 20:41, Patrick Wiener <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi Grainier,
>>> 
>>> Definitely, it should make it super simple to integrate various well 
>>> known Python libs. The only real limitation is that they’ll also have 
>>> to work in an event-driven fashion.
>>> 
>>> I guess the most clean way would be to port the Java wrapper to Python 
>>> to finally have something such "pip install streampipes-python“. Right 
>>> now in the prototype we have a special ExternalEventProcessor [1] that 
>>> only calls in the
>>> onInvocation() and onDetach() and forwards the request to a Flask 
>>> endpoint in Python.
>>> 
>>> Do you have experience with running Python + Java projects „together“?
>>> I saw Flink is using py4j [2]. 
>>> 
>>> What do you think about porting it all to Python? 
>>> 
>>> Patrick
>>> 
>>> [1] 
>>> https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w 
>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-w>
>>> rapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalEv
>>> entProcessor.java 
>>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes-
>>> wrapper/src/main/java/org/apache/streampipes/wrapper/runtime/ExternalE
>>> ventProcessor.java> [2] https://www.py4j.org/ <https://www.py4j.org/>
>>> 
>>> 
>>>> Am 16.07.2020 um 14:42 schrieb Grainier Perera <[email protected]>:
>>>> 
>>>> Hi Patrick,
>>>> 
>>>> This will be very useful. We can use this to expose the capabilities 
>>>> of popular libraries such as scikit-learn, SciPy, etc... By the way, 
>>>> How this works? Will it use java bridge, Jython or something similar?
>>>> 
>>>> Grainier Perera.
>>>> 
>>>> 
>>>> On Thu, 16 Jul 2020 at 13:42, Patrick Wiener <[email protected]> wrote:
>>>> 
>>>>> Hi guys,
>>>>> 
>>>>> this mail is to inform you and discuss the addition of a new wrapper 
>>>>> for
>>>>> StreamPipes: StreamPipes Python Wrapper
>>>>> 
>>>>> Current wrappers such as standalone (JVM) or distributed (Flink) 
>>>>> already allow us to develop new processors in the given runtime 
>>>>> environment. I suppose to add the Python wrapper to this family.
>>>>> 
>>>>> Why Python wrapper?
>>>>> 
>>>>> * Python is a widely used language especially in the domain of data 
>>>>> science
>>>>> * Python is more concise and thus better to read
>>>>> * We provide more options for standalone algorithms: It allows 
>>>>> newcomers unfamiliar with Java to faster implement their algorithmns
>>>>> 
>>>>> Current implementation:
>>>>> 
>>>>> Currently it only works when implementing the declareModel() as part 
>>>>> of the controller in Java and sending the invocation request to 
>>>>> Python on the receiver side. Thus, it is necessary to run both Java 
>>>>> + Python in one container . While it works, this should of course 
>>>>> not be the standard way to do it.
>>>>> 
>>>>> As said, I already started a very very basic implementation of it 
>>>>> that I would add it to the core project under 
>>>>> streampipes-wrapper-python or do you have any other thoughts?
>>>>> 
>>>>> I am happy to discuss this topic with you and hope that some of you 
>>>>> are eager to help working on the Python wrapper.
>>>>> 
>>>>> What are your thoughts?
>>>>> 
>>>>> Patrick
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 
> 

Reply via email to