This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit e0da05b20b23713f528a8be93eb81bf1d9721392 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Thu Apr 7 18:51:16 2022 +0200 [WAYANG-#8] add dataquanta Tests and small corrections Signed-off-by: bertty <[email protected]> --- python/old_code/test.py | 4 +- python/src/pywy/dataquanta.py | 38 +++--- python/src/pywy/operators/__init__.py | 3 +- .../src/pywy/tests/unit/dataquanta/context_test.py | 2 +- .../pywy/tests/unit/dataquanta/dataquanta_test.py | 134 +++++++++++++++++++++ 5 files changed, 158 insertions(+), 23 deletions(-) diff --git a/python/old_code/test.py b/python/old_code/test.py index 9a2b544f..1b3111c5 100644 --- a/python/old_code/test.py +++ b/python/old_code/test.py @@ -31,9 +31,9 @@ for index in range(0, 1): tic = time.perf_counter() fileop = WayangContext()\ .register(python)\ - .textFile("/Users/bertty/databloom/blossom/python/resources/tmp"+str(index))\ + .textfile("/Users/bertty/databloom/blossom/python/resources/tmp" + str(index))\ .filter(pre)\ - .storeTextFile("/Users/bertty/databloom/blossom/python/resources/out"+str(index)) + .store_textfile("/Users/bertty/databloom/blossom/python/resources/out" + str(index)) toc = time.perf_counter() print(f"Downloaded the tutorial in {toc - tic:0.4f} seconds") diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 9f10eba2..0b9483d3 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -1,11 +1,12 @@ -from typing import Set +from typing import Set, List, cast from pywy.core import Translator -from pywy.types import ( GenericTco, Predicate, Function, FlatmapFunction, IterableO ) +from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO, T, I, O) from pywy.operators import * from pywy.core import PywyPlan from pywy.core import Plugin + class WayangContext: """ This is the entry point for users to work with Wayang. @@ -18,6 +19,7 @@ class WayangContext: """ add a :class:`Plugin` to the :class:`Context` """ + def register(self, *plugins: Plugin): for p in plugins: self.plugins.add(p) @@ -26,12 +28,13 @@ class WayangContext: """ remove a :class:`Plugin` from the :class:`Context` """ + def unregister(self, *plugins: Plugin): for p in plugins: self.plugins.remove(p) return self - def textFile(self, file_path: str) -> 'DataQuanta[str]': + def textfile(self, file_path: str) -> 'DataQuanta[str]': return DataQuanta(self, TextFileSource(file_path)) def __str__(self): @@ -40,43 +43,40 @@ class WayangContext: def __repr__(self): return self.__str__() + class DataQuanta(GenericTco): """ Represents an intermediate result/data flow edge in a [[WayangPlan]]. """ - previous : PywyOperator = None context: WayangContext - def __init__(self, context:WayangContext, operator: PywyOperator): + def __init__(self, context: WayangContext, operator: PywyOperator): self.operator = operator self.context = context - def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" : - return DataQuanta(self.context, self.__connect(FilterOperator(p))) + def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]": + return DataQuanta(self.context, self._connect(FilterOperator(p))) - def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" : - return DataQuanta(self.context,self.__connect(MapOperator(f))) + def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]": + return DataQuanta(self.context, self._connect(MapOperator(f))) - def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" : - return DataQuanta(self.context,self.__connect(FlatmapOperator(f))) + def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]": + return DataQuanta(self.context, self._connect(FlatmapOperator(f))) - def storeTextFile(self: "DataQuanta[I]", path: str) : - last = self.__connect(TextFileSink(path)) - plan = PywyPlan(self.context.plugins, [last]) + def store_textfile(self: "DataQuanta[I]", path: str): + last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))] + plan = PywyPlan(self.context.plugins, last) plug = self.context.plugins.pop() - trs: Translator = Translator(plug, plan) + trs: Translator = Translator(plug, plan) new_plan = trs.translate() plug.get_executor().execute(new_plan) # TODO add the logic to execute the plan - def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator: + def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator: self.operator.connect(0, op, port_op) return op - def getOperator(self): - return self.operator - def __str__(self): return str(self.operator) diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py index 68306811..4c1c7181 100644 --- a/python/src/pywy/operators/__init__.py +++ b/python/src/pywy/operators/__init__.py @@ -1,5 +1,5 @@ from pywy.operators.base import PywyOperator -from pywy.operators.sink import TextFileSink +from pywy.operators.sink import TextFileSink, SinkOperator from pywy.operators.source import TextFileSource from pywy.operators.unary import FilterOperator, MapOperator, FlatmapOperator # @@ -8,6 +8,7 @@ __ALL__= [ TextFileSink, TextFileSource, FilterOperator, + SinkOperator # MapOperator, # FlatmapOperator ] \ No newline at end of file diff --git a/python/src/pywy/tests/unit/dataquanta/context_test.py b/python/src/pywy/tests/unit/dataquanta/context_test.py index a74c28ba..43e7e376 100644 --- a/python/src/pywy/tests/unit/dataquanta/context_test.py +++ b/python/src/pywy/tests/unit/dataquanta/context_test.py @@ -79,7 +79,7 @@ class TestUnitDataquantaContext(unittest.TestCase): self.assertIsInstance(context, WayangContext) self.assertEqual(len(context.plugins), 0) - dataQuanta = context.textFile(path) + dataQuanta = context.textfile(path) self.assertIsInstance(dataQuanta, DataQuanta) self.assertIsInstance(dataQuanta.operator, TextFileSource) diff --git a/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py b/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py new file mode 100644 index 00000000..9739307b --- /dev/null +++ b/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py @@ -0,0 +1,134 @@ +import unittest +from typing import Tuple, Callable +from unittest.mock import Mock + +from pywy.dataquanta import WayangContext +from pywy.dataquanta import DataQuanta +from pywy.operators import * + + +class TestUnitCoreTranslator(unittest.TestCase): + context: WayangContext + + def setUp(self): + self.context = Mock() + pass + + def build_seed(self) -> Tuple[PywyOperator, DataQuanta]: + operator = PywyOperator("Empty") + dq = DataQuanta(self.context, operator) + return operator, dq + + def test_create(self): + (operator, dq) = self.build_seed() + + self.assertIsInstance(dq, DataQuanta) + self.assertEqual(dq.context, self.context) + self.assertEqual(dq.operator, operator) + + def test_connect(self): + operator = PywyOperator("Empty1") + operator2 = PywyOperator("Empty2") + dq = DataQuanta(self.context, operator) + + self.assertIsNone(operator2.inputOperator[0]) + after_operator2 = dq._connect(operator2) + self.assertEqual(operator2, after_operator2) + self.assertIsNotNone(operator2.inputOperator[0]) + self.assertEqual(operator, operator2.inputOperator[0]) + self.assertEqual(operator.outputOperator[0], operator2) + + def validate_filter(self, filtered: DataQuanta, operator: PywyOperator): + self.assertIsInstance(filtered, DataQuanta) + self.assertIsInstance(filtered.operator, FilterOperator) + self.assertEqual(filtered.context, self.context) + self.assertNotEqual(filtered.operator, operator) + self.assertEqual(filtered.operator.inputOperator[0], operator) + + def test_filter_lambda(self): + (operator, dq) = self.build_seed() + pred: Callable = lambda x: "" in x + filtered = dq.filter(pred) + self.validate_filter(filtered, operator) + + def test_filter_func(self): + (operator, dq) = self.build_seed() + + def pred(x: str) -> bool: + return "" in x + + filtered = dq.filter(pred) + self.validate_filter(filtered, operator) + + def test_filter_func_lambda(self): + (operator, dq) = self.build_seed() + + def pred(x): + return "" in x + + filtered = dq.filter(lambda x: pred(x)) + self.validate_filter(filtered, operator) + + def validate_map(self, mapped: DataQuanta, operator: PywyOperator): + self.assertIsInstance(mapped, DataQuanta) + self.assertIsInstance(mapped.operator, MapOperator) + self.assertEqual(mapped.context, self.context) + self.assertNotEqual(mapped.operator, operator) + self.assertEqual(mapped.operator.inputOperator[0], operator) + + def test_map_lambda(self): + (operator, dq) = self.build_seed() + func: Callable = lambda x: len(x) + mapped = dq.map(func) + self.validate_map(mapped, operator) + + def test_map_func(self): + (operator, dq) = self.build_seed() + + def func(x: str) -> int: + return len(x) + + mapped = dq.map(func) + self.validate_map(mapped, operator) + + def test_map_func_lambda(self): + (operator, dq) = self.build_seed() + + def func(x): + return x == 0 + + mapped = dq.map(lambda x: func(x)) + self.validate_map(mapped, operator) + + def validate_flatmap(self, flatted: DataQuanta, operator: PywyOperator): + self.assertIsInstance(flatted, DataQuanta) + self.assertIsInstance(flatted.operator, FlatmapOperator) + self.assertEqual(flatted.context, self.context) + self.assertNotEqual(flatted.operator, operator) + self.assertEqual(flatted.operator.inputOperator[0], operator) + + def test_flatmap_lambda(self): + (operator, dq) = self.build_seed() + func: Callable = lambda x: x.split(" ") + flatted = dq.flatmap(func) + self.validate_flatmap(flatted, operator) + + def test_flatmap_func(self): + (operator, dq) = self.build_seed() + + def fmfunc(i: str) -> str: + for x in range(len(i)): + yield str(x) + + flatted = dq.flatmap(fmfunc) + self.validate_flatmap(flatted, operator) + + def test_flatmap_func_lambda(self): + (operator, dq) = self.build_seed() + + def fmfunc(i): + for x in range(len(i)): + yield str(x) + + flatted = dq.flatmap(lambda x: fmfunc(x)) + self.validate_flatmap(flatted, operator)
