This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 650e127c6f95d9df93311580a70257011b9d9c2a Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 12:11:34 2022 +0200 [WAYANG-#8] add TextFileSinkOperator Signed-off-by: bertty <[email protected]> --- python/src/pywayang/dataquanta.py | 5 +++ python/src/pywayang/operator/sink.py | 28 +++++++++++++++++ python/src/pywayang/platforms/python/mappings.py | 1 + .../python/operators/PyTextFileSinkOperator.py | 36 ++++++++++++++++++++++ .../platforms/python/operators/__init__.py | 4 ++- 5 files changed, 73 insertions(+), 1 deletion(-) diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py index 3ccc2839..9cd892eb 100644 --- a/python/src/pywayang/dataquanta.py +++ b/python/src/pywayang/dataquanta.py @@ -1,6 +1,7 @@ from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO) from pywayang.operator.base import (WyOperator) from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator) +from pywayang.operator.sink import TextFileSink class DataQuanta(GenericTco): @@ -22,6 +23,10 @@ class DataQuanta(GenericTco): def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" : return DataQuanta(FlatmapOperator(f)) + def storeTextFile(self: "DataQuanta[I]", path: str) : + last = DataQuanta(TextFileSink(path)) + # TODO add the logic to execute the plan + def getOperator(self): return self.operator diff --git a/python/src/pywayang/operator/sink.py b/python/src/pywayang/operator/sink.py new file mode 100644 index 00000000..52cbeb0e --- /dev/null +++ b/python/src/pywayang/operator/sink.py @@ -0,0 +1,28 @@ +from pywayang.operator.base import WyOperator + +class SinkUnaryOperator(WyOperator): + + def __init__(self, name:str): + super().__init__(name, None, str, 0, 1) + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + + + +class TextFileSink(SinkUnaryOperator): + + path: str + + def __init__(self, path: str): + super().__init__('TextFileSink') + self.path = path + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() \ No newline at end of file diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py index 55a80180..7060ba07 100644 --- a/python/src/pywayang/platforms/python/mappings.py +++ b/python/src/pywayang/platforms/python/mappings.py @@ -33,4 +33,5 @@ OperatorMappings = Mapping() OperatorMappings.add_mapping(PyFilterOperator()) OperatorMappings.add_mapping(PyTextFileSourceOperator()) +OperatorMappings.add_mapping(PyTextFileSinkOperator()) diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py new file mode 100644 index 00000000..6589a634 --- /dev/null +++ b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py @@ -0,0 +1,36 @@ +from pywayang.operator.sink import TextFileSink +from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywayang.platforms.python.channels import ( + Channel, + ChannelDescriptor, + PyIteratorChannel, + PyIteratorChannelDescriptor + ) +from typing import Set + +class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator): + + def __init__(self, origin: TextFileSink = None): + path = None if origin is None else origin.path + super().__init__(path) + pass + + def execute(self, inputs: Channel, outputs: Channel): + self.validateChannels(inputs, outputs) + if isinstance(inputs[0], PyIteratorChannel) : + file = open(self.path,'w') + py_in_iter_channel: PyIteratorChannel = inputs[0] + iterable = py_in_iter_channel.provide_iterable(); + for element in iterable: + file.write(str(element)) + file.close() + + else: + raise Exception("Channel Type does not supported") + + + def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + return {PyIteratorChannelDescriptor} + + def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + raise Exception("The PyTextFileSource does not support Output Channels") diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py index 5db92431..7a555422 100644 --- a/python/src/pywayang/platforms/python/operators/__init__.py +++ b/python/src/pywayang/platforms/python/operators/__init__.py @@ -1,9 +1,11 @@ from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator +from pywayang.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator __ALL__ = [ PythonExecutionOperator, PyFilterOperator, - PyTextFileSourceOperator + PyTextFileSourceOperator, + PyTextFileSinkOperator ] \ No newline at end of file
