xintongsong commented on code in PR #32: URL: https://github.com/apache/flink-agents/pull/32#discussion_r2161337602
########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 Review Comment: The story of this example doesn't really make sense. IIUC, this is more like a temporal example for developing and testing purpose. We should explain this in the docstring, and add a todo to remove it or replace it with something meanningful. ########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.input + content = input + ' first_action' + ctx.send_event(MyEvent(value=content)) + ctx.send_event(OutputEvent(output=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.value + content = input + ' second_action' + ctx.send_event(OutputEvent(output=content)) + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment(module=__name__) + + input_queue = deque() + workflow = MyWorkflow() + + output_queue = env.from_input(input_queue).apply(workflow).to_output() + + env.execute() + + input_queue.append({'key': 'bob', 'value': 'The message from bob'}) + input_queue.append({'key': 'john', 'value': 'The message from john'}) + input_queue.append({'value': 'The message from unknow'}) # will automatically generate a new unique key + input_queue.append({'finish': True}) # mark source finish + + time.sleep(1) # wait a second to get outputs Review Comment: It's a bit too complex to have this `finish` and sleep things. I think we can simplify this by adding all the inputs into the input queue before executing the workflow, and make `env.execute()` a blocking call that finishes after consuming all the inputs and emitting all the outputs. In that case, we can also replace the queues with lists. ########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.input + content = input + ' first_action' + ctx.send_event(MyEvent(value=content)) + ctx.send_event(OutputEvent(output=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.value + content = input + ' second_action' + ctx.send_event(OutputEvent(output=content)) + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment(module=__name__) + + input_queue = deque() + workflow = MyWorkflow() + + output_queue = env.from_input(input_queue).apply(workflow).to_output() + + env.execute() + + input_queue.append({'key': 'bob', 'value': 'The message from bob'}) + input_queue.append({'key': 'john', 'value': 'The message from john'}) + input_queue.append({'value': 'The message from unknow'}) # will automatically generate a new unique key Review Comment: typo 'unknown' ########## python/flink_agents/api/workflow.py: ########## @@ -0,0 +1,19 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +class Workflow: Review Comment: Abstract class. ########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.input + content = input + ' first_action' + ctx.send_event(MyEvent(value=content)) + ctx.send_event(OutputEvent(output=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.value + content = input + ' second_action' + ctx.send_event(OutputEvent(output=content)) + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment(module=__name__) + + input_queue = deque() + workflow = MyWorkflow() + + output_queue = env.from_input(input_queue).apply(workflow).to_output() + + env.execute() + + input_queue.append({'key': 'bob', 'value': 'The message from bob'}) Review Comment: Dict is a bit complex here, as users needs to use the correct keys. I'd suggest to use tuple. ########## python/flink_agents/runtime/local_runner.py: ########## @@ -0,0 +1,198 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import logging +import uuid +from collections import deque +from typing import Any, Dict, Optional +from uuid import UUID + +from typing_extensions import override + +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow +from flink_agents.plan.workflow_plan import WorkflowPlan +from flink_agents.runtime.workflow_runner import WorkflowRunner + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class LocalRunnerContext(RunnerContext): + """Implementation of WorkflowRunnerContext for local workflow execution. + + Attributes: + ---------- + __workflow_plan : WorkflowPlan + Internal workflow plan for this context. + __id : UUID + Unique identifier for the context. + events : deque[Event] + Queue of events to be processed in this context. + outputs : deque[Any] + Queue of outputs generated by workflow execution. + """ + + __workflow_plan: WorkflowPlan + __id: UUID + events: deque[Event] + outputs: deque[Any] + + def __init__(self, workflow_plan: WorkflowPlan, id: UUID) -> None: + """Initialize a new session with the given workflow and ID. + + Parameters + ---------- + workflow : Workflow + Workflow plan used for this context. + id : UUID + Unique context identifier. If None, a new UUID will be generated. + """ + self.__workflow_plan = workflow_plan + if id is None: + id = uuid.uuid4() + self.__id = id + self.events = deque() + self.outputs = deque() + + @property + def session_id(self) -> UUID: Review Comment: ```suggestion def key(self) -> Any: ``` ########## python/flink_agents/api/tests/test_decorators.py: ########## @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.runner_context import RunnerContext + + +def test_action_decorator() -> None: #noqa D103 Review Comment: I think we need more test cases, for: - One action listens to multiple events - Negative cases for illegal signature - Negative cases for listening to non-event types ########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.input + content = input + ' first_action' + ctx.send_event(MyEvent(value=content)) + ctx.send_event(OutputEvent(output=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.value + content = input + ' second_action' + ctx.send_event(OutputEvent(output=content)) + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment(module=__name__) + + input_queue = deque() + workflow = MyWorkflow() + + output_queue = env.from_input(input_queue).apply(workflow).to_output() Review Comment: ```suggestion output_queue = env.from_queue(input_queue).apply(workflow).to_queue() ``` ########## python/flink_agents/examples/workflow_example.py: ########## @@ -0,0 +1,66 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import time +from collections import deque +from typing import Any + +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow + + +class MyEvent(Event): #noqa D101 + value: Any + +class MyWorkflow(Workflow): #noqa D101 + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.input + content = input + ' first_action' + ctx.send_event(MyEvent(value=content)) + ctx.send_event(OutputEvent(output=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): #noqa D102 + input = event.value + content = input + ' second_action' + ctx.send_event(OutputEvent(output=content)) + + +if __name__ == "__main__": + env = AgentsExecutionEnvironment.get_execution_environment(module=__name__) + + input_queue = deque() + workflow = MyWorkflow() + + output_queue = env.from_input(input_queue).apply(workflow).to_output() Review Comment: Because later we will also support DataStream and Table as input and output. ########## python/flink_agents/runtime/local_runner.py: ########## @@ -0,0 +1,198 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import logging +import uuid +from collections import deque +from typing import Any, Dict, Optional +from uuid import UUID + +from typing_extensions import override + +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.workflow import Workflow +from flink_agents.plan.workflow_plan import WorkflowPlan +from flink_agents.runtime.workflow_runner import WorkflowRunner + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class LocalRunnerContext(RunnerContext): + """Implementation of WorkflowRunnerContext for local workflow execution. + + Attributes: + ---------- + __workflow_plan : WorkflowPlan + Internal workflow plan for this context. + __id : UUID + Unique identifier for the context. + events : deque[Event] + Queue of events to be processed in this context. + outputs : deque[Any] + Queue of outputs generated by workflow execution. + """ + + __workflow_plan: WorkflowPlan + __id: UUID Review Comment: ```suggestion __key: Any ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
